package com.alibaba.dts.recordgenerator;

import com.alibaba.dts.common.Checkpoint;
import com.alibaba.dts.common.Context;
import com.alibaba.dts.common.Util;
import com.alibaba.dts.metastore.KafkaMetaStore;
import com.alibaba.dts.metastore.LocalFileMetaStore;
import com.alibaba.dts.metastore.MetaStoreCenter;
import com.alibaba.dts.recordprocessor.EtlRecordProcessor;
import com.alibaba.dts.recordprocessor.postgresql.PostgresqlFieldConverter;
import java.io.Closeable;
import java.util.Iterator;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/alibaba/dts/recordgenerator/RecordGenerator.class */
public class RecordGenerator implements Runnable, Closeable {
    private static final Logger log = LoggerFactory.getLogger(RecordGenerator.class);
    private static final String LOCAL_FILE_STORE_NAME = "localCheckpointStore";
    private static final String KAFKA_STORE_NAME = "kafkaCheckpointStore";
    private final Properties properties;
    private final int tryTime;
    private final Context context;
    private final TopicPartition topicPartition;
    private final String groupID;
    private final ConsumerWrapFactory consumerWrapFactory;
    private final Checkpoint initialCheckpoint;
    private final AtomicBoolean useCheckpointConfig;
    private final ConsumerSubscribeMode subscribeMode;
    private final long tryBackTimeMS;
    private volatile Checkpoint toCommitCheckpoint = null;
    private final MetaStoreCenter metaStoreCenter = new MetaStoreCenter();
    private volatile boolean existed = false;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: com.alibaba.dts.recordgenerator.RecordGenerator$1, reason: invalid class name */
    /* loaded from: input_file:com/alibaba/dts/recordgenerator/RecordGenerator$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$com$alibaba$dts$recordgenerator$RecordGenerator$ConsumerSubscribeMode = new int[ConsumerSubscribeMode.values().length];

        static {
            try {
                $SwitchMap$com$alibaba$dts$recordgenerator$RecordGenerator$ConsumerSubscribeMode[ConsumerSubscribeMode.SUBSCRIBE.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$com$alibaba$dts$recordgenerator$RecordGenerator$ConsumerSubscribeMode[ConsumerSubscribeMode.ASSIGN.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/alibaba/dts/recordgenerator/RecordGenerator$ConsumerSubscribeMode.class */
    public enum ConsumerSubscribeMode {
        ASSIGN,
        SUBSCRIBE,
        UNKNOWN
    }

    public RecordGenerator(Properties properties, Context context, Checkpoint checkpoint, ConsumerWrapFactory consumerWrapFactory) {
        this.properties = properties;
        this.tryTime = Integer.parseInt(properties.getProperty(Names.TRY_TIME, "150"));
        this.tryBackTimeMS = Long.parseLong(properties.getProperty(Names.TRY_BACK_TIME_MS, "10000"));
        this.context = context;
        this.consumerWrapFactory = consumerWrapFactory;
        this.initialCheckpoint = checkpoint;
        this.topicPartition = new TopicPartition(properties.getProperty(Names.KAFKA_TOPIC), 0);
        this.groupID = properties.getProperty(Names.GROUP_NAME);
        this.subscribeMode = parseConsumerSubscribeMode(properties.getProperty(Names.SUBSCRIBE_MODE_NAME, "assign"));
        this.useCheckpointConfig = new AtomicBoolean(StringUtils.equalsIgnoreCase(properties.getProperty(Names.USE_CONFIG_CHECKPOINT_NAME), "true"));
        this.metaStoreCenter.registerStore(composeLocalFileStoreName(LOCAL_FILE_STORE_NAME, this.groupID), new LocalFileMetaStore(composeLocalFileStoreName(LOCAL_FILE_STORE_NAME, this.groupID)));
        log.info("RecordGenerator: try time [" + this.tryTime + "], try backTimeMS [" + this.tryBackTimeMS + "]");
    }

    private String composeLocalFileStoreName(String str, String str2) {
        return StringUtils.join(new String[]{str, "-", str2});
    }

    private ConsumerWrap getConsumerWrap() {
        return this.consumerWrapFactory.getConsumerWrap(this.properties);
    }

    @Override // java.lang.Runnable
    public void run() {
        int i = 0;
        String str = "first start";
        ConsumerWrap consumerWrap = null;
        while (!this.existed) {
            EtlRecordProcessor recordProcessor = this.context.getRecordProcessor();
            try {
                try {
                    consumerWrap = getConsumerWrap(str);
                    while (!this.existed) {
                        mayCommitCheckpoint();
                        Iterator it = consumerWrap.poll().iterator();
                        while (it.hasNext()) {
                            ConsumerRecord consumerRecord = (ConsumerRecord) it.next();
                            int i2 = 0;
                            if (consumerRecord.value() != null && ((byte[]) consumerRecord.value()).length > 2) {
                                while (!recordProcessor.offer(1000L, TimeUnit.MILLISECONDS, consumerRecord) && !this.existed) {
                                    i2++;
                                    if (i2 % 10 == 0) {
                                        log.info("RecordGenerator: offer record has failed for a period (10s) [ " + consumerRecord + "]");
                                    }
                                }
                            }
                        }
                    }
                    Util.swallowErrorClose(consumerWrap);
                } catch (Throwable th) {
                    if (isErrorRecoverable(th)) {
                        int i3 = i;
                        i++;
                        if (i3 < this.tryTime) {
                            log.warn("RecordGenerator: error meet cause " + th.getMessage() + ", recover time [" + i + "]", th);
                            Util.sleepMS(this.tryBackTimeMS);
                            str = "reconnect";
                            Util.swallowErrorClose(consumerWrap);
                        }
                    }
                    log.error("RecordGenerator: unrecoverable error  " + th.getMessage() + ", have try time [" + i + "]", th);
                    this.existed = true;
                    Util.swallowErrorClose(consumerWrap);
                }
            } catch (Throwable th2) {
                Util.swallowErrorClose(consumerWrap);
                throw th2;
            }
        }
    }

    private void mayCommitCheckpoint() {
        if (null != this.toCommitCheckpoint) {
            commitCheckpoint(this.toCommitCheckpoint.getTopicPartition(), this.toCommitCheckpoint);
            this.toCommitCheckpoint = null;
        }
    }

    public void setToCommitCheckpoint(Checkpoint checkpoint) {
        this.toCommitCheckpoint = checkpoint;
    }

    private ConsumerWrap getConsumerWrap(String str) {
        Checkpoint checkpoint;
        ConsumerWrap consumerWrap = getConsumerWrap();
        this.metaStoreCenter.registerStore(KAFKA_STORE_NAME, new KafkaMetaStore(consumerWrap.getRawConsumer()));
        if (this.useCheckpointConfig.compareAndSet(true, false)) {
            log.info("RecordGenerator: force use initial checkpoint [{}] to start", (Object) null);
            checkpoint = this.initialCheckpoint;
        } else {
            checkpoint = getCheckpoint();
            if (null == checkpoint || Checkpoint.INVALID_STREAM_CHECKPOINT == checkpoint) {
                checkpoint = this.initialCheckpoint;
                log.info("RecordGenerator: use initial checkpoint [{}] to start", checkpoint);
            } else {
                log.info("RecordGenerator: load checkpoint from checkpoint store success, current checkpoint [{}]", checkpoint);
            }
        }
        switch (AnonymousClass1.$SwitchMap$com$alibaba$dts$recordgenerator$RecordGenerator$ConsumerSubscribeMode[this.subscribeMode.ordinal()]) {
            case PostgresqlFieldConverter.BIT /* 1 */:
                consumerWrap.subscribeTopic(this.topicPartition, () -> {
                    Checkpoint seek = this.metaStoreCenter.seek(KAFKA_STORE_NAME, this.topicPartition, this.groupID);
                    if (null == seek) {
                        seek = this.initialCheckpoint;
                    }
                    return seek;
                });
                break;
            case PostgresqlFieldConverter.BIG_DECIMAL /* 2 */:
                consumerWrap.assignTopic(this.topicPartition, checkpoint);
                break;
            default:
                throw new RuntimeException("RecordGenerator: unknown mode not support");
        }
        log.info("RecordGenerator:" + str + ", checkpoint " + checkpoint);
        return consumerWrap;
    }

    private Checkpoint getCheckpoint() {
        Checkpoint seek = this.metaStoreCenter.seek(composeLocalFileStoreName(LOCAL_FILE_STORE_NAME, this.groupID), this.topicPartition, this.groupID);
        if (null == seek) {
            seek = this.metaStoreCenter.seek(KAFKA_STORE_NAME, this.topicPartition, this.groupID);
        }
        return seek;
    }

    public void commitCheckpoint(TopicPartition topicPartition, Checkpoint checkpoint) {
        if (null == topicPartition || null == checkpoint) {
            return;
        }
        this.metaStoreCenter.store(topicPartition, this.groupID, checkpoint);
    }

    private boolean isErrorRecoverable(Throwable th) {
        return true;
    }

    public Checkpoint getInitialCheckpoint() {
        return this.initialCheckpoint;
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        this.existed = true;
    }

    private ConsumerSubscribeMode parseConsumerSubscribeMode(String str) {
        if (StringUtils.equalsIgnoreCase("assign", str)) {
            return ConsumerSubscribeMode.ASSIGN;
        }
        if (StringUtils.equalsIgnoreCase("subscribe", str)) {
            return ConsumerSubscribeMode.SUBSCRIBE;
        }
        throw new RuntimeException("RecordGenerator: unknown subscribe mode [" + str + "]");
    }
}
