/*
 * Decompiled with CFR 0.152.
 */
package com.aliyun.drc.clusterclient.impl;

import com.alibaba.fastjson.JSONObject;
import com.aliyun.drc.client.DRCClient;
import com.aliyun.drc.client.DRCClientFactory;
import com.aliyun.drc.client.DataFilter;
import com.aliyun.drc.client.Listener;
import com.aliyun.drc.client.impl.Checkpoint;
import com.aliyun.drc.clusterclient.ClusterListener;
import com.aliyun.drc.clusterclient.impl.DrcClientListener;
import com.aliyun.drc.clusterclient.partition.PartitionImpl;
import com.aliyun.drc.clusterclient.partition.PartitionPool;
import com.aliyun.drc.regionmanager.RegionRouterInfo;
import com.aliyun.drc.util.AbnormalThreadHook;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ClientCluster
implements AbnormalThreadHook {
    private static Logger logger = LoggerFactory.getLogger(ClientCluster.class);
    private String guid;
    private int listenerIndex = 0;
    private List<ClusterListener> listeners;
    private Map<String, DrcClientListener> clientListeners = new ConcurrentHashMap<String, DrcClientListener>();
    private Map<String, DRCClient> clientPools = new ConcurrentHashMap<String, DRCClient>();
    private Map<Thread, String> clientThreadPools = new ConcurrentHashMap<Thread, String>();
    private PartitionPool partitionPool;
    private RegionRouterInfo regionRouterInfo;

    public void doStart(JSONObject jsonObject) throws Exception {
        String partitionName = jsonObject.getJSONObject("partition").getString("name");
        String topic = jsonObject.getString("topic");
        String filter = (String)jsonObject.getJSONArray("tables").get(0);
        String offset = jsonObject.getString("offset");
        logger.info("Do start command, partition:" + partitionName + ", topic:" + topic + ", filter:" + filter + ", offset:" + offset);
        if (this.partitionPool.getPartition(partitionName) != null) {
            logger.warn("Partition " + partitionName + " already exists in PartitionPool, start command is ignored...");
            throw new Exception("Partition " + partitionName + " already exists in PartitionPool");
        }
        PartitionImpl partition = new PartitionImpl(partitionName);
        partition.setTopic(topic);
        ClusterListener clusterListener = this.getNextListener();
        DrcClientListener listener = new DrcClientListener(clusterListener, this, this.regionRouterInfo.getDataType());
        listener.setPartition(partition);
        Properties properties = new Properties();
        properties.put("manager.host", this.regionRouterInfo.getClusterUrl());
        properties.put("guid", this.guid);
        DRCClient client = DRCClientFactory.create((DRCClientFactory.Type)DRCClientFactory.Type.MYSQL, (Object)properties);
        StringBuffer dataFilterStr = new StringBuffer();
        for (String part : filter.split("\\|")) {
            dataFilterStr.append(part).append(".*|");
        }
        client.addDataFilter(new DataFilter(dataFilterStr.toString()));
        client.addListener((Listener)listener);
        com.aliyun.drc.clusterclient.partition.Checkpoint checkpoint = new com.aliyun.drc.clusterclient.partition.Checkpoint(offset);
        Checkpoint drcClientCheckpoint = new Checkpoint();
        if (checkpoint.getInstance() != null) {
            drcClientCheckpoint.setServerId(checkpoint.getInstance());
        }
        if (checkpoint.getFilePosition() != null) {
            drcClientCheckpoint.setPosition(checkpoint.getFilePosition());
        }
        if (checkpoint.getTimestamp() != null) {
            drcClientCheckpoint.setTimestamp(checkpoint.getTimestamp());
        }
        if (checkpoint.getId() != null) {
            drcClientCheckpoint.setRecordId(checkpoint.getId());
        }
        if (this.regionRouterInfo.isUsePublicIp()) {
            client.usePublicIp();
        }
        client.initService(this.regionRouterInfo.getUsername(), topic, this.regionRouterInfo.getPassword(), drcClientCheckpoint, null);
        Thread serviceThread = client.startService();
        serviceThread.setName("DTS-DRCClient-" + partitionName + "-Thread");
        serviceThread.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler(){

            @Override
            public void uncaughtException(Thread arg0, Throwable arg1) {
                logger.error(arg0.getName() + ", " + arg1.toString());
            }
        });
        clusterListener.addListenedPartition(listener, partition);
        this.partitionPool.addPartition(partition);
        this.partitionPool.addMonitoredThread(serviceThread, this);
        this.clientListeners.put(partitionName, listener);
        this.clientPools.put(partitionName, client);
        this.clientThreadPools.put(serviceThread, partitionName);
        logger.info("Do start command finished, partition:" + partitionName + ", topic:" + topic + ", filter:" + filter + ", offset:" + offset);
    }

    public void doStop(String partition) {
        DRCClient client;
        logger.info("Do stop command, partition:" + partition);
        this.partitionPool.removePartition(partition);
        DrcClientListener removedClientListener = this.clientListeners.get(partition);
        if (removedClientListener != null) {
            removedClientListener.getListener().removeListenedPartition(removedClientListener, removedClientListener.getPartition());
            this.clientListeners.remove(partition);
        }
        if ((client = this.clientPools.get(partition)) != null) {
            try {
                client.stopService();
                this.clientPools.remove(partition);
            }
            catch (Exception e) {
                logger.error("stop drcclient service exception: " + partition, (Throwable)e);
            }
        } else {
            logger.warn(partition + " find null drcclient");
        }
        Iterator<Map.Entry<Thread, String>> iterator = this.clientThreadPools.entrySet().iterator();
        while (iterator.hasNext()) {
            Map.Entry<Thread, String> entry = iterator.next();
            if (!entry.getValue().equalsIgnoreCase(partition)) continue;
            if (!entry.getKey().isAlive()) {
                iterator.remove();
                break;
            }
            logger.error(entry.getKey().getName() + " still alive, stop drcclient service failed");
            break;
        }
        logger.info("Do stop command finished, partition:" + partition);
    }

    @Override
    public void notifyThreadFailed(Thread t) {
        String partition = this.clientThreadPools.get(t);
        if (partition != null) {
            logger.warn("monitor DRCClient thread is not alive, execute do stop... " + t.getName());
            this.doStop(partition);
        }
    }

    public void shutdown() throws Exception {
        Iterator<Map.Entry<String, DRCClient>> iterator = this.clientPools.entrySet().iterator();
        while (iterator.hasNext()) {
            Map.Entry<String, DRCClient> entry = iterator.next();
            try {
                entry.getValue().stopService();
                iterator.remove();
            }
            catch (Exception e) {
                logger.error("stop drcclient service exception: " + entry.getKey(), (Throwable)e);
            }
        }
        this.checkNotAliveThreads();
        logger.info("cluster has been shutdown...");
    }

    private void checkNotAliveThreads() throws Exception {
        int notAlive = 0;
        for (Thread t : this.clientThreadPools.keySet()) {
            logger.info("service Thread:" + t.getName() + ", current caller Thread:" + Thread.currentThread().getName());
            if (t != Thread.currentThread() && !t.isAlive()) {
                ++notAlive;
                logger.info(t.getName() + " is NOT alive, stop service success");
                continue;
            }
            logger.warn(t.getName() + " is STILL alive, stop service failure");
        }
        if (notAlive != this.clientThreadPools.size()) {
            logger.error("cluster shutdown failure, still has alive threads, total:" + this.clientThreadPools.size() + ", alive:" + (this.clientThreadPools.size() - notAlive));
            throw new Exception("cluster shutdown failure, still has alive threads");
        }
    }

    private ClusterListener getNextListener() {
        if (this.listenerIndex >= this.listeners.size()) {
            this.listenerIndex = 0;
        }
        return this.listeners.get(this.listenerIndex++);
    }

    public void setGuid(String guid) {
        this.guid = guid;
    }

    public void setListeners(List<ClusterListener> listeners) {
        this.listeners = listeners;
    }

    public void setPartitions(PartitionPool partitionPool) {
        this.partitionPool = partitionPool;
    }

    public void setRegionRouterInfo(RegionRouterInfo regionRouterInfo) {
        this.regionRouterInfo = regionRouterInfo;
    }
}

