/*
 * Decompiled with CFR 0.152.
 */
package com.aliyun.dts.subscribe.clients.check.util;

import com.aliyun.dts.subscribe.clients.check.util.NodeCommandClientConfig;
import java.util.Arrays;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.kafka.clients.ApiVersions;
import org.apache.kafka.clients.ClientRequest;
import org.apache.kafka.clients.ClientResponse;
import org.apache.kafka.clients.ClientUtils;
import org.apache.kafka.clients.KafkaClient;
import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.NetworkClient;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.metrics.JmxReporter;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.MetricsReporter;
import org.apache.kafka.common.network.ChannelBuilder;
import org.apache.kafka.common.network.Selectable;
import org.apache.kafka.common.network.Selector;
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.AbstractResponse;
import org.apache.kafka.common.requests.MetadataRequest;
import org.apache.kafka.common.requests.MetadataResponse;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.common.utils.Time;

public class NodeCommandClient {
    public static final ConfigDef DEFAULT_CLIENT_CONFIG = NodeCommandClientConfig.configDef().withClientSaslSupport().withClientSslSupport();

    private static LogContext createLogContext(String clientId) {
        return new LogContext("[NodeCommandClient clientId=" + clientId + "] ");
    }

    public static final class CommandClient {
        private final KafkaClient kafkaClient;
        private final CommandClientConfig commandClientConfig;
        private final List<MetricsReporter> reporters;
        private static final int MAX_INFLIGHT_REQUESTS = 100;
        private static final AtomicInteger incrementCounter = new AtomicInteger(100);

        public CommandClient(CommandClientConfig config) {
            this.commandClientConfig = config;
            SystemTime time = new SystemTime();
            LinkedHashMap<String, String> metricTags = new LinkedHashMap<String, String>();
            String clientId = this.commandClientConfig.getString("client.id");
            metricTags.put("client-id", clientId);
            Metadata metadata = new Metadata(this.commandClientConfig.getLong("retry.backoff.ms").longValue(), this.commandClientConfig.getLong("metadata.max.age.ms").longValue(), false);
            List addresses = ClientUtils.parseAndValidateAddresses((List)this.commandClientConfig.getList("bootstrap.servers"));
            metadata.update(Cluster.bootstrap((List)addresses), Collections.EMPTY_SET, time.milliseconds());
            MetricConfig metricConfig = new MetricConfig().samples(this.commandClientConfig.getInt("metrics.num.samples").intValue()).timeWindow(this.commandClientConfig.getLong("metrics.sample.window.ms").longValue(), TimeUnit.MILLISECONDS).tags(metricTags);
            this.reporters = this.commandClientConfig.getConfiguredInstances("metric.reporters", MetricsReporter.class);
            this.reporters.add((MetricsReporter)new JmxReporter("kafka.admin.client"));
            Metrics metrics = new Metrics(metricConfig, this.reporters, (Time)time);
            ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder((AbstractConfig)this.commandClientConfig);
            LogContext logContext = NodeCommandClient.createLogContext(clientId);
            Selector selector = new Selector(this.commandClientConfig.getLong("connections.max.idle.ms").longValue(), metrics, (Time)time, "kafka-client", channelBuilder, logContext);
            this.kafkaClient = new NetworkClient((Selectable)selector, metadata, clientId, 100, this.commandClientConfig.getLong("reconnect.backoff.ms").longValue(), this.commandClientConfig.getLong("reconnect_back_off_time").longValue(), this.commandClientConfig.getInt("send.buffer.bytes").intValue(), this.commandClientConfig.getInt("receive.buffer.bytes").intValue(), this.commandClientConfig.getInt("request.timeout.ms").intValue(), (Time)time, true, new ApiVersions(), logContext);
        }

        public void close() {
            if (null != this.kafkaClient) {
                try {
                    this.kafkaClient.close();
                }
                catch (Exception exception) {
                    // empty catch block
                }
            }
        }

        private Node ensureOneNodeIsReady(List<Node> nodes) {
            Node brokerId = null;
            long readyTimeout = Time.SYSTEM.milliseconds() + (long)this.commandClientConfig.getInt("request.timeout.ms").intValue();
            boolean foundNode = false;
            while (!foundNode && Time.SYSTEM.milliseconds() < readyTimeout) {
                for (Node node : nodes) {
                    if (!this.kafkaClient.ready(node, Time.SYSTEM.milliseconds())) continue;
                    brokerId = node;
                    foundNode = true;
                    break;
                }
                try {
                    this.kafkaClient.poll(this.commandClientConfig.getLong("poll.ms").longValue(), Time.SYSTEM.milliseconds());
                }
                catch (Exception e) {
                    throw new CommandClientProviderException("Could not poll.", e);
                }
            }
            if (brokerId == null) {
                throw new CommandClientProviderException("Could not find any available broker. Check your NodeCommandClientConfig setting 'bootstrap.servers'. This error might also occur, if you try to connect to pre-0.10 brokers. Kafka Streams requires broker version 0.10.1.x or higher.");
            }
            return brokerId;
        }

        private ClientResponse sendRequestSync(ClientRequest clientRequest) {
            try {
                this.kafkaClient.send(clientRequest, Time.SYSTEM.milliseconds());
            }
            catch (Exception e) {
                throw new CommandClientProviderException("Could not send request.", e);
            }
            long responseTimeout = Time.SYSTEM.milliseconds() + (long)this.commandClientConfig.getInt("request.timeout.ms").intValue();
            while (Time.SYSTEM.milliseconds() < responseTimeout) {
                List responseList;
                try {
                    responseList = this.kafkaClient.poll(this.commandClientConfig.getLong("poll.ms").longValue(), Time.SYSTEM.milliseconds());
                }
                catch (IllegalStateException e) {
                    throw new CommandClientProviderException("Could not poll.", e);
                }
                if (responseList.isEmpty()) continue;
                if (responseList.size() > 1) {
                    throw new CommandClientProviderException("Sent one request but received multiple or no responses.");
                }
                ClientResponse response = (ClientResponse)responseList.get(0);
                if (response.requestHeader().correlationId() == clientRequest.correlationId()) {
                    return response;
                }
                throw new CommandClientProviderException("Inconsistent response received from the broker " + clientRequest.destination() + ", expected correlation id " + clientRequest.correlationId() + ", but received " + response.requestHeader().correlationId());
            }
            throw new CommandClientProviderException("Failed to get response from broker within timeout");
        }

        private <T extends AbstractResponse> T doRequest(Node destination, AbstractRequest.Builder builder, Class type, String typeName) {
            this.ensureOneNodeIsReady(Arrays.asList(destination));
            if (null == destination || destination.isEmpty()) {
                throw new CommandClientProviderException("destination is required");
            }
            ClientRequest clientRequest = this.kafkaClient.newClientRequest(destination.idString(), builder, Time.SYSTEM.milliseconds(), true);
            ClientResponse clientResponse = this.sendRequestSync(clientRequest);
            if (!clientResponse.hasResponse()) {
                throw new CommandClientProviderException("Empty response for client request.");
            }
            if (!type.isAssignableFrom(clientResponse.responseBody().getClass())) {
                throw new CommandClientProviderException("Inconsistent response type for internal topic  " + typeName + "Request. Expected " + typeName + "Response but received " + clientResponse.responseBody().getClass().getName());
            }
            return (T)clientResponse.responseBody();
        }

        public MetadataResponse fetchMetadata() {
            return (MetadataResponse)this.doRequest(this.getAnyReadyBrokerId(), (AbstractRequest.Builder)MetadataRequest.Builder.allTopics(), MetadataResponse.class, "MetaData");
        }

        private Node getAnyReadyBrokerId() {
            Metadata metadata = new Metadata(this.commandClientConfig.getLong("retry.backoff.ms").longValue(), this.commandClientConfig.getLong("metadata.max.age.ms").longValue(), false);
            List addresses = ClientUtils.parseAndValidateAddresses((List)this.commandClientConfig.getList("bootstrap.servers"));
            metadata.update(Cluster.bootstrap((List)addresses), Collections.EMPTY_SET, Time.SYSTEM.milliseconds());
            List nodes = metadata.fetch().nodes();
            return this.ensureOneNodeIsReady(nodes);
        }
    }

    public static final class CommandClientProviderException
    extends RuntimeException {
        public CommandClientProviderException(String message, Exception e) {
            super(message, e);
        }

        public CommandClientProviderException(String message) {
            super(message);
        }
    }

    public static class CommandClientConfig
    extends AbstractConfig {
        public static CommandClientConfig fromNodeCommandClientConfig(NodeCommandClientConfig NodeCommandClientConfig2) {
            return new CommandClientConfig(NodeCommandClientConfig2.originals());
        }

        public CommandClientConfig(Map<?, ?> originals) {
            super(DEFAULT_CLIENT_CONFIG, originals);
        }
    }
}

