/*
 * Decompiled with CFR 0.152.
 */
package com.aliyun.openservices.loghub.client;

import com.aliyun.openservices.loghub.client.DefaultLogHubCheckPointTracker;
import com.aliyun.openservices.loghub.client.FetchTaskResult;
import com.aliyun.openservices.loghub.client.FetchedLogGroup;
import com.aliyun.openservices.loghub.client.ITask;
import com.aliyun.openservices.loghub.client.InitTaskResult;
import com.aliyun.openservices.loghub.client.InitializeTask;
import com.aliyun.openservices.loghub.client.LogHubClientAdapter;
import com.aliyun.openservices.loghub.client.LogHubFetchTask;
import com.aliyun.openservices.loghub.client.LogHubHeartBeat;
import com.aliyun.openservices.loghub.client.ProcessTask;
import com.aliyun.openservices.loghub.client.ProcessTaskResult;
import com.aliyun.openservices.loghub.client.ShutDownTask;
import com.aliyun.openservices.loghub.client.TaskResult;
import com.aliyun.openservices.loghub.client.config.LogHubConfig;
import com.aliyun.openservices.loghub.client.config.LogHubCursorPosition;
import com.aliyun.openservices.loghub.client.exceptions.LogHubCheckPointException;
import com.aliyun.openservices.loghub.client.interfaces.ILogHubProcessor;
import com.aliyun.openservices.loghub.client.throttle.ResourceBarrier;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ShardConsumer {
    private static final Logger LOG = LoggerFactory.getLogger(ShardConsumer.class);
    private static final long PRE_ALLOCATED_BYTES = 0x1400000L;
    private int shardID;
    private LogHubClientAdapter loghubClient;
    private DefaultLogHubCheckPointTracker checkpointTracker;
    private ILogHubProcessor processor;
    private LogHubCursorPosition initialPosition;
    private int startTime;
    private int maxFetchLogGroupSize;
    private ConsumerStatus currentStatus = ConsumerStatus.INITIALIZING;
    private ITask currentTask;
    private Future<TaskResult> taskFuture;
    private Future<TaskResult> fetchDataFuture;
    private ExecutorService executorService;
    private String nextFetchCursor;
    private boolean shutdown = false;
    private FetchedLogGroup lastFetchedData;
    private long lastLogErrorTime = 0L;
    private long lastFetchTime = 0L;
    private int lastFetchCount = 0;
    private int lastFetchRawSize = 0;
    private int throttledCount = 0;
    private LogHubConfig config;
    private ResourceBarrier resourceBarrier;
    private long lastUnThrottledTimeInMillis = 0L;

    public ShardConsumer(LogHubClientAdapter loghubClient, int shardID, ILogHubProcessor processor, ExecutorService executorService, LogHubConfig config, LogHubHeartBeat heartBeat, ResourceBarrier resourceBarrier) {
        this.loghubClient = loghubClient;
        this.shardID = shardID;
        this.initialPosition = config.getCursorPosition();
        this.startTime = config.GetCursorStartTime();
        this.processor = processor;
        this.checkpointTracker = new DefaultLogHubCheckPointTracker(loghubClient, config, heartBeat, shardID);
        this.executorService = executorService;
        this.maxFetchLogGroupSize = config.getMaxFetchLogGroupSize();
        this.config = config;
        this.resourceBarrier = resourceBarrier;
        this.lastUnThrottledTimeInMillis = System.currentTimeMillis();
    }

    public boolean consume(boolean fetchAllowed) {
        this.checkAndGenerateNextTask();
        if (this.currentStatus.equals((Object)ConsumerStatus.PROCESSING) && this.lastFetchedData == null && !this.fetchData(fetchAllowed)) {
            return !fetchAllowed;
        }
        return true;
    }

    public void saveCheckPoint(String cursor, boolean persistent) throws LogHubCheckPointException {
        this.checkpointTracker.saveCheckPoint(cursor, persistent);
    }

    private void checkAndGenerateNextTask() {
        if (this.taskFuture == null || this.taskFuture.isCancelled() || this.taskFuture.isDone()) {
            boolean taskSuccess = false;
            TaskResult result = this.getTaskResult(this.taskFuture, false);
            this.taskFuture = null;
            if (result != null && result.getException() == null) {
                ProcessTaskResult processTaskResult;
                String checkpoint;
                taskSuccess = true;
                if (this.currentStatus.equals((Object)ConsumerStatus.INITIALIZING)) {
                    InitTaskResult initResult = (InitTaskResult)result;
                    this.nextFetchCursor = initResult.getCursor();
                    if (initResult.isCursorPersistent()) {
                        this.checkpointTracker.setInPersistentCheckPoint(this.nextFetchCursor);
                    }
                } else if (result instanceof ProcessTaskResult && (checkpoint = (processTaskResult = (ProcessTaskResult)result).getRollBackCheckpoint()) != null && !checkpoint.isEmpty()) {
                    this.cancelCurrentFetch();
                    this.nextFetchCursor = checkpoint;
                }
            }
            this.sampleLogError(result);
            this.updateStatus(taskSuccess);
            this.generateNextTask();
        }
    }

    private boolean checkThrottled() {
        long nowInMillis = System.currentTimeMillis();
        if (this.resourceBarrier.tryAcquire(0x1400000L)) {
            this.lastUnThrottledTimeInMillis = nowInMillis;
            return false;
        }
        ++this.throttledCount;
        if (this.throttledCount % 200 == 0) {
            this.throttledCount = 0;
            if (nowInMillis - this.lastUnThrottledTimeInMillis > 900000L) {
                LOG.error("ShardConsumeThrottledWarning, Fetch request throttled more than 900 seconds, shard {}", (Object)this.shardID);
            } else {
                LOG.warn("Fetch request throttled, shard {}", (Object)this.shardID);
            }
        }
        return true;
    }

    private boolean shouldFetchNext(boolean hasError) {
        if (hasError) {
            return false;
        }
        long currentNow = System.currentTimeMillis();
        boolean allowFetch = this.lastFetchRawSize < 0x100000 && this.lastFetchCount < 100 && this.lastFetchCount < this.maxFetchLogGroupSize ? currentNow - this.lastFetchTime > 500L : (this.lastFetchRawSize < 0x200000 && this.lastFetchCount < 500 && this.lastFetchCount < this.maxFetchLogGroupSize ? currentNow - this.lastFetchTime > 200L : (this.lastFetchRawSize < 0x400000 && this.lastFetchCount < 1000 && this.lastFetchCount < this.maxFetchLogGroupSize ? currentNow - this.lastFetchTime > 50L : true));
        if (!allowFetch) {
            return false;
        }
        return !this.checkThrottled();
    }

    private boolean fetchData(boolean fetchAllowed) {
        boolean hasError = false;
        if (this.fetchDataFuture != null) {
            if (this.fetchDataFuture.isCancelled()) {
                this.fetchDataFuture = null;
                this.lastFetchedData = null;
                this.resourceBarrier.release(0x1400000L);
                return true;
            }
            if (!this.fetchDataFuture.isDone()) {
                return true;
            }
            TaskResult result = this.getTaskResult(this.fetchDataFuture, false);
            if (result != null && result.getException() == null) {
                FetchTaskResult fetchResult = (FetchTaskResult)result;
                this.lastFetchedData = new FetchedLogGroup(this.shardID, fetchResult.getFetchedData(), fetchResult.getCursor());
                this.nextFetchCursor = fetchResult.getCursor();
                this.lastFetchCount = this.lastFetchedData.getFetchedData().size();
                this.lastFetchRawSize = fetchResult.getRawSize();
                this.resourceBarrier.acquire((long)this.lastFetchRawSize - 0x1400000L);
                this.sampleLogError(result);
                hasError = result != null && result.getException() != null;
            } else {
                this.resourceBarrier.release(0x1400000L);
            }
        }
        if (!fetchAllowed || !this.shouldFetchNext(hasError)) {
            this.fetchDataFuture = null;
            return false;
        }
        this.lastFetchTime = System.currentTimeMillis();
        LogHubFetchTask task = new LogHubFetchTask(this.loghubClient, this.shardID, this.nextFetchCursor, this.config);
        this.fetchDataFuture = this.executorService.submit(task);
        return true;
    }

    private void sampleLogError(TaskResult result) {
        long curTime;
        if (result != null && result.getException() != null && (curTime = System.currentTimeMillis()) - this.lastLogErrorTime > 5000L) {
            LOG.warn("", (Throwable)result.getException());
            this.lastLogErrorTime = curTime;
        }
    }

    private TaskResult getTaskResult(Future<TaskResult> future, boolean canceled) {
        if (future != null && (future.isDone() || future.isCancelled())) {
            try {
                return future.get();
            }
            catch (CancellationException ex) {
                if (!canceled) {
                    LOG.warn("Task was been unexpected canceled");
                }
            }
            catch (Exception ex) {
                LOG.error("Error retrieving task result", (Throwable)ex);
            }
        }
        return null;
    }

    private void cancelCurrentFetch() {
        if (this.fetchDataFuture != null) {
            this.fetchDataFuture.cancel(true);
            this.getTaskResult(this.fetchDataFuture, true);
            this.fetchDataFuture = null;
            this.lastFetchedData = null;
            this.resourceBarrier.release(0x1400000L);
            LOG.info("Cancel a fetch task, shard id: {}", (Object)this.shardID);
        }
    }

    private void generateNextTask() {
        ITask nextTask = null;
        if (this.currentStatus.equals((Object)ConsumerStatus.INITIALIZING)) {
            nextTask = new InitializeTask(this.processor, this.loghubClient, this.shardID, this.initialPosition, this.startTime);
        } else if (this.currentStatus.equals((Object)ConsumerStatus.PROCESSING)) {
            if (this.lastFetchedData != null) {
                this.checkpointTracker.setCursor(this.lastFetchedData.getEndCursor());
                nextTask = new ProcessTask(this.processor, this.lastFetchedData.getFetchedData(), this.checkpointTracker, this.lastFetchRawSize, this.resourceBarrier);
                this.lastFetchedData = null;
            }
        } else if (this.currentStatus.equals((Object)ConsumerStatus.SHUTTING_DOWN)) {
            if (this.lastFetchedData != null) {
                this.resourceBarrier.release(this.lastFetchRawSize);
            }
            nextTask = new ShutDownTask(this.processor, this.checkpointTracker);
            this.cancelCurrentFetch();
        }
        if (nextTask != null) {
            this.currentTask = nextTask;
            this.taskFuture = this.executorService.submit(this.currentTask);
        }
    }

    private void updateStatus(boolean taskSuccess) {
        if (this.currentStatus.equals((Object)ConsumerStatus.SHUTTING_DOWN)) {
            if (this.currentTask == null || taskSuccess) {
                this.currentStatus = ConsumerStatus.SHUTDOWN_COMPLETE;
            }
        } else if (this.shutdown) {
            this.currentStatus = ConsumerStatus.SHUTTING_DOWN;
        } else if (taskSuccess && this.currentStatus.equals((Object)ConsumerStatus.INITIALIZING)) {
            this.currentStatus = ConsumerStatus.PROCESSING;
        }
    }

    public void shutdown() {
        this.shutdown = true;
        if (!this.isShutdown()) {
            this.checkAndGenerateNextTask();
        }
    }

    public boolean isShutdown() {
        return this.currentStatus.equals((Object)ConsumerStatus.SHUTDOWN_COMPLETE);
    }

    boolean canBeUnloaded() {
        if (!this.config.isUnloadAfterCommitEnabled()) {
            return true;
        }
        return this.checkpointTracker.isAllCommitted();
    }

    static enum ConsumerStatus {
        INITIALIZING,
        PROCESSING,
        SHUTTING_DOWN,
        SHUTDOWN_COMPLETE;

    }
}

