/*
 * Decompiled with CFR 0.152.
 */
package com.aliyun.openservices.loghub.client;

import com.aliyun.openservices.loghub.client.LogHubClientAdapter;
import com.aliyun.openservices.loghub.client.LogHubHeartBeat;
import com.aliyun.openservices.loghub.client.LogThreadFactory;
import com.aliyun.openservices.loghub.client.LoghubClientUtil;
import com.aliyun.openservices.loghub.client.ShardConsumer;
import com.aliyun.openservices.loghub.client.ShardFilter;
import com.aliyun.openservices.loghub.client.config.LogHubConfig;
import com.aliyun.openservices.loghub.client.exceptions.LogHubClientWorkerException;
import com.aliyun.openservices.loghub.client.interfaces.ILogHubProcessorFactory;
import com.aliyun.openservices.loghub.client.throttle.FixedResourceBarrier;
import com.aliyun.openservices.loghub.client.throttle.ResourceBarrier;
import com.aliyun.openservices.loghub.client.throttle.UnlimitedResourceBarrier;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ClientWorker
implements Runnable {
    private static final Logger LOG = LoggerFactory.getLogger(ClientWorker.class);
    private final ILogHubProcessorFactory processorFactory;
    private final LogHubConfig logHubConfig;
    private final LogHubHeartBeat logHubHeartBeat;
    private final Map<Integer, ShardConsumer> shardConsumer = new HashMap<Integer, ShardConsumer>();
    private final ExecutorService executorService;
    private LogHubClientAdapter loghubClient;
    private volatile boolean shutDown = false;
    private volatile boolean mainLoopExit = false;
    private ResourceBarrier resourceBarrier;
    private int lastFetchThrottleMinShard = 0;
    private ShardFilter shardFilter;

    public ClientWorker(ILogHubProcessorFactory factory, LogHubConfig config) throws LogHubClientWorkerException {
        this(factory, config, null);
    }

    public ClientWorker(ILogHubProcessorFactory factory, LogHubConfig config, ExecutorService service) throws LogHubClientWorkerException {
        this.processorFactory = factory;
        this.logHubConfig = config;
        this.executorService = service == null ? Executors.newCachedThreadPool(new LogThreadFactory()) : service;
        this.loghubClient = new LogHubClientAdapter(config);
        this.loghubClient.createConsumerGroupIfNotExist(config);
        this.logHubHeartBeat = new LogHubHeartBeat(this.loghubClient, config);
        int dataSizeInMB = this.logHubConfig.getMaxInProgressingDataSizeInMB();
        this.resourceBarrier = dataSizeInMB > 0 ? new FixedResourceBarrier((long)dataSizeInMB * 1024L * 1024L) : new UnlimitedResourceBarrier();
    }

    public void SwitchClient(String accessKeyId, String accessKey) {
        this.loghubClient.SwitchClient(this.logHubConfig.getEndpoint(), accessKeyId, accessKey, null);
    }

    public void SwitchClient(String accessKeyId, String accessKey, String stsToken) {
        this.loghubClient.SwitchClient(this.logHubConfig.getEndpoint(), accessKeyId, accessKey, stsToken);
    }

    public void setShardFilter(ShardFilter shardFilter) {
        this.shardFilter = shardFilter;
    }

    private List<Integer> filterAndSortShards(List<Integer> shards) {
        if (shards != null && this.shardFilter != null) {
            shards = this.shardFilter.filter(shards);
        }
        if (shards == null) {
            return Collections.emptyList();
        }
        Collections.sort(shards);
        return shards;
    }

    @Override
    public void run() {
        this.logHubHeartBeat.start();
        long fetchInterval = this.logHubConfig.getFetchIntervalMillis();
        while (!this.shutDown) {
            List<Integer> heldShards = this.logHubHeartBeat.getHeldShards();
            List<Integer> shards = this.filterAndSortShards(heldShards);
            int curFetchThrottleMinShard = -1;
            for (int shard : shards) {
                ShardConsumer consumer = this.consumerForShard(shard);
                if (consumer.consume(shard >= this.lastFetchThrottleMinShard) || curFetchThrottleMinShard >= 0) continue;
                curFetchThrottleMinShard = shard;
            }
            this.lastFetchThrottleMinShard = Math.max(curFetchThrottleMinShard, 0);
            this.cleanConsumer(heldShards);
            LoghubClientUtil.sleep(fetchInterval);
        }
        this.mainLoopExit = true;
    }

    public void shutdown() {
        this.shutDown = true;
        int times = 0;
        while (!this.mainLoopExit && times++ < 20) {
            LoghubClientUtil.sleep(1000L);
        }
        for (ShardConsumer consumer : this.shardConsumer.values()) {
            consumer.shutdown();
        }
        LoghubClientUtil.shutdownAndAwaitTermination(this.executorService, 30L);
        this.logHubHeartBeat.stop();
    }

    private void cleanConsumer(List<Integer> ownedShard) {
        ArrayList<Integer> removeShards = new ArrayList<Integer>();
        for (Map.Entry<Integer, ShardConsumer> shard : this.shardConsumer.entrySet()) {
            ShardConsumer consumer = shard.getValue();
            Integer shardID = shard.getKey();
            if (!ownedShard.contains(shardID)) {
                LOG.info("Shard {} has been assigned to another consumer.", (Object)shardID);
                if (consumer.canBeUnloaded()) {
                    LOG.info("Shutting down consumer of shard: {}", (Object)shardID);
                    consumer.shutdown();
                } else {
                    LOG.info("Shard {} cannot be unloaded as it's checkpoint has not been committed yet", (Object)shardID);
                }
            }
            if (!consumer.isShutdown()) continue;
            this.logHubHeartBeat.removeHeartShard(shardID);
            removeShards.add(shardID);
            LOG.info("Consumer of shard shutdown success: {}", (Object)shardID);
        }
        Iterator<Map.Entry<Integer, ShardConsumer>> iterator = removeShards.iterator();
        while (iterator.hasNext()) {
            int shard = (Integer)((Object)iterator.next());
            this.shardConsumer.remove(shard);
        }
    }

    private ShardConsumer consumerForShard(int shardId) {
        ShardConsumer consumer = this.shardConsumer.get(shardId);
        if (consumer != null) {
            return consumer;
        }
        consumer = new ShardConsumer(this.loghubClient, shardId, this.processorFactory.generatorProcessor(), this.executorService, this.logHubConfig, this.logHubHeartBeat, this.resourceBarrier);
        this.shardConsumer.put(shardId, consumer);
        LOG.info("Create a consumer for shard: {}", (Object)shardId);
        return consumer;
    }
}

