/*
 * Decompiled with CFR 0.152.
 */
package com.aliyun.openservices.loghub.client;

import com.aliyun.openservices.log.exception.LogException;
import com.aliyun.openservices.log.response.BatchGetLogResponse;
import com.aliyun.openservices.loghub.client.FetchTaskResult;
import com.aliyun.openservices.loghub.client.ITask;
import com.aliyun.openservices.loghub.client.LogHubClientAdapter;
import com.aliyun.openservices.loghub.client.LoghubClientUtil;
import com.aliyun.openservices.loghub.client.TaskResult;
import com.aliyun.openservices.loghub.client.config.LogHubConfig;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class LogHubFetchTask
implements ITask {
    private static final Logger LOG = LoggerFactory.getLogger(LogHubFetchTask.class);
    private LogHubClientAdapter loghubClient;
    private String cursor;
    private int shardId;
    private LogHubConfig config;

    public LogHubFetchTask(LogHubClientAdapter loghubClient, int shardId, String cursor, LogHubConfig config) {
        this.loghubClient = loghubClient;
        this.shardId = shardId;
        this.cursor = cursor;
        this.config = config;
    }

    @Override
    public TaskResult call() {
        LogException exception = null;
        int attempt = 0;
        while (true) {
            try {
                BatchGetLogResponse response = this.loghubClient.BatchGetLogs(this.shardId, this.config.getMaxFetchLogGroupSize(), this.cursor);
                List fetchedData = response.GetLogGroups();
                LOG.debug("shard {}, cursor {}, next cursor {}, response size: {}", new Object[]{this.shardId, this.cursor, response.GetNextCursor(), response.GetCount()});
                String nextCursor = response.GetNextCursor();
                if (nextCursor.isEmpty()) {
                    LOG.info("Shard {} next cursor is empty, set to current cursor {}", (Object)this.shardId, (Object)this.cursor);
                    nextCursor = this.cursor;
                }
                return new FetchTaskResult(fetchedData, nextCursor, response.GetRawSize());
            }
            catch (LogException lex) {
                block9: {
                    block8: {
                        block7: {
                            if (attempt != 0 || !lex.GetErrorCode().toLowerCase().contains("invalidcursor")) break block7;
                            this.resetCursor();
                            break block8;
                        }
                        LOG.error("Fail to pull data from shard {}, cursor {}", new Object[]{this.shardId, this.cursor, lex});
                        if (attempt >= 1) break block9;
                        LoghubClientUtil.sleep(200L);
                    }
                    ++attempt;
                    continue;
                }
                exception = lex;
                return new TaskResult((Exception)((Object)exception));
            }
            break;
        }
    }

    private void resetCursor() {
        try {
            String defaultCursor = this.loghubClient.getCursor(this.shardId, this.config.getCursorPosition(), this.config.GetCursorStartTime());
            LOG.info("Invalid cursor {}, reset to default position {}", (Object)this.cursor, (Object)defaultCursor);
            this.cursor = defaultCursor;
        }
        catch (LogException ex) {
            LOG.error("Unable to reset cursor", (Throwable)ex);
        }
    }
}

