/*
 * Decompiled with CFR 0.152.
 */
package com.aliyun.openservices.ots.internal.streamclient.lease;

import com.aliyun.openservices.ots.internal.streamclient.ClientConfig;
import com.aliyun.openservices.ots.internal.streamclient.DependencyException;
import com.aliyun.openservices.ots.internal.streamclient.StreamClientException;
import com.aliyun.openservices.ots.internal.streamclient.lease.Lease;
import com.aliyun.openservices.ots.internal.streamclient.lease.LeaseRenewer;
import com.aliyun.openservices.ots.internal.streamclient.lease.LeaseTaker;
import com.aliyun.openservices.ots.internal.streamclient.lease.interfaces.ILeaseManager;
import com.aliyun.openservices.ots.internal.streamclient.lease.interfaces.ILeaseRenewer;
import com.aliyun.openservices.ots.internal.streamclient.lease.interfaces.ILeaseTaker;
import java.util.Collection;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class LeaseCoordinator<T extends Lease> {
    private static final Logger LOG = LoggerFactory.getLogger(LeaseCoordinator.class);
    private final ILeaseManager<T> leaseManager;
    private final ILeaseRenewer<T> leaseRenewer;
    private final ILeaseTaker<T> leaseTaker;
    private final long renewerIntervalMillis;
    private final long takerIntervalMillis;
    private final long maxWaitTableReadyTimeMillis;
    private final int statusTableReadCapacity;
    private final int statusTableWriteCapacity;
    private ScheduledExecutorService scheduledExecutorService;
    private static final String THREAD_PREFIX = "LeaseCoordinator-";
    private volatile boolean running = false;
    private Throwable lastTakeException;
    private Throwable lastRenewException;
    private long lastTimeOfSuccessfulTake;
    private long lastTimeOfSuccessfulRenew;
    private Object shutdownLock = new Object();

    public LeaseCoordinator(ILeaseManager<T> leaseManager, String workerIdentifier, ClientConfig config) {
        ThreadPoolExecutor renewPool = new ThreadPoolExecutor(config.getRenewThreadPoolSize(), config.getRenewThreadPoolSize(), 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
        renewPool.allowCoreThreadTimeOut(true);
        this.leaseManager = leaseManager;
        this.leaseTaker = new LeaseTaker<T>(leaseManager, workerIdentifier, config.getLeaseDurationMillis(), config.isAutoStealLease());
        this.leaseRenewer = new LeaseRenewer<T>(leaseManager, workerIdentifier, config.getLeaseDurationMillis(), renewPool);
        this.renewerIntervalMillis = config.getRenewerIntervalMillis();
        this.takerIntervalMillis = config.getTakerIntervalMillis();
        this.maxWaitTableReadyTimeMillis = config.getMaxWaitTableReadyTimeMillis();
        this.statusTableReadCapacity = config.getStatusTableReadCapacity();
        this.statusTableWriteCapacity = config.getStatusTableWriteCapacity();
    }

    public Throwable getLastTakeException() {
        return this.lastTakeException;
    }

    public Throwable getLastRenewException() {
        return this.lastRenewException;
    }

    public long getLastTimeOfSuccessfulTake() {
        return this.lastTimeOfSuccessfulTake;
    }

    public long getLastTimeOfSuccessfulRenew() {
        return this.lastTimeOfSuccessfulRenew;
    }

    public void start() throws StreamClientException, DependencyException {
        this.leaseRenewer.initialize();
        this.scheduledExecutorService = Executors.newScheduledThreadPool(2, new ThreadFactory(){
            private AtomicInteger counter = new AtomicInteger(0);

            @Override
            public Thread newThread(Runnable runnable) {
                Thread thread = Executors.defaultThreadFactory().newThread(runnable);
                thread.setName(LeaseCoordinator.THREAD_PREFIX + this.counter.incrementAndGet());
                return thread;
            }
        });
        this.scheduledExecutorService.scheduleWithFixedDelay(new TakerRunnable(), 0L, this.takerIntervalMillis, TimeUnit.MILLISECONDS);
        this.scheduledExecutorService.scheduleWithFixedDelay(new RenewerRunnable(), 0L, this.renewerIntervalMillis, TimeUnit.MILLISECONDS);
        this.running = true;
    }

    public void initialize() throws StreamClientException, DependencyException {
        boolean created = this.leaseManager.createLeaseTableIfNotExists(this.statusTableReadCapacity, this.statusTableWriteCapacity, -1);
        LOG.debug("Create status table, created: {}.", (Object)created);
        this.leaseManager.waitUntilTableReady(this.maxWaitTableReadyTimeMillis);
    }

    public boolean isRunning() {
        return this.running;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void stop() {
        LOG.info("Start stop lease coordinator.");
        if (!this.isRunning()) {
            LOG.info("Lease coordinator is not running.");
            return;
        }
        LOG.debug("Shutdown executor service for taker and renewer.");
        this.scheduledExecutorService.shutdownNow();
        LOG.debug("Executor service is shutdown.");
        Object object = this.shutdownLock;
        synchronized (object) {
            LOG.debug("Clear currently held leases as coordinator is stopped.");
            this.leaseRenewer.clearCurrentlyHeldLeases();
            this.running = false;
        }
        LOG.info("Lease coordinator is stopped.");
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void runTaker() throws StreamClientException, DependencyException {
        Map<String, T> takenLeases = this.leaseTaker.takeLeases();
        Object object = this.shutdownLock;
        synchronized (object) {
            if (this.isRunning()) {
                this.leaseRenewer.addLeasesToRenew(takenLeases.values());
            }
        }
    }

    private void runRenewer() throws StreamClientException, DependencyException {
        this.leaseRenewer.renewLeases();
    }

    public Collection<T> getCurrentlyHeldLeases() {
        return this.leaseRenewer.getCurrentlyHeldLeases().values();
    }

    public T getCurrentlyHeldLease(String leaseKey) {
        return this.leaseRenewer.getCurrentlyHeldLease(leaseKey);
    }

    public boolean updateLease(T lease, String leaseIdentifier) throws StreamClientException, DependencyException {
        return this.leaseRenewer.updateLease(lease, leaseIdentifier);
    }

    public boolean transferLease(String leaseKey, String leaseIdentifier) throws StreamClientException, DependencyException {
        return this.leaseRenewer.transferLease(leaseKey, leaseIdentifier);
    }

    public void checkRenewerAndTakerStatus(long maxDurationBeforeLastSuccess) throws StreamClientException {
        long now = System.currentTimeMillis();
        if (this.lastTimeOfSuccessfulRenew == 0L) {
            this.lastTimeOfSuccessfulRenew = now;
        }
        if (this.lastTimeOfSuccessfulTake == 0L) {
            this.lastTimeOfSuccessfulTake = now;
        }
        if (this.lastTimeOfSuccessfulRenew + maxDurationBeforeLastSuccess < now) {
            throw new StreamClientException("Too long didn't renew. LastRenewTime: " + this.lastTimeOfSuccessfulRenew + ".", this.lastRenewException);
        }
        if (this.lastTimeOfSuccessfulTake + maxDurationBeforeLastSuccess < now) {
            throw new StreamClientException("Too long didn't take. LastTakeTime: " + this.lastTimeOfSuccessfulTake + ".", this.lastTakeException);
        }
    }

    private class RenewerRunnable
    implements Runnable {
        private RenewerRunnable() {
        }

        @Override
        public void run() {
            try {
                LeaseCoordinator.this.runRenewer();
                LeaseCoordinator.this.lastTimeOfSuccessfulRenew = System.currentTimeMillis();
                LOG.debug("Period review lease at {}.", (Object)LeaseCoordinator.this.lastTimeOfSuccessfulRenew);
            }
            catch (Throwable t) {
                LOG.error("Failed to renew lease: {}.", t);
                LeaseCoordinator.this.lastRenewException = t;
            }
        }
    }

    private class TakerRunnable
    implements Runnable {
        private TakerRunnable() {
        }

        @Override
        public void run() {
            try {
                LeaseCoordinator.this.runTaker();
                LeaseCoordinator.this.lastTimeOfSuccessfulTake = System.currentTimeMillis();
                LOG.debug("Period take lease at {}.", (Object)LeaseCoordinator.this.lastTimeOfSuccessfulTake);
            }
            catch (Throwable t) {
                LOG.error("Failed to take lease: {}.", t);
                LeaseCoordinator.this.lastTakeException = t;
            }
        }
    }
}

