package cn.insmart.fx.dts.aliyun.service;

import cn.insmart.fx.dts.service.RecordService;
import com.alibaba.dts.common.RecordListener;
import com.alibaba.dts.recordgenerator.RecordGenerator;
import com.alibaba.dts.recordprocessor.EtlRecordProcessor;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.stereotype.Service;

@Service
/* loaded from: input_file:cn/insmart/fx/dts/aliyun/service/DtsServiceImpl.class */
public class DtsServiceImpl implements RecordService, InitializingBean {
    private static final int DELAY_SECONDS = 1;
    private final RecordGenerator recordGenerator;
    private final EtlRecordProcessor recordProcessor;
    private final RecordListener handleRecordListener;
    private static final Logger log = LoggerFactory.getLogger(DtsServiceImpl.class);
    private static final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(2);

    private void runRecordProcessor() {
        try {
            log.info("dts record processor run...");
            this.recordProcessor.run();
        } catch (Exception e) {
            this.recordProcessor.close();
            log.error("dts processor error", e);
        } finally {
            scheduleRecordProcessor();
        }
    }

    private void runRecordGenerator() {
        try {
            log.info("dts record generator run...");
            this.recordGenerator.run();
        } catch (Exception e) {
            this.recordGenerator.close();
            log.error("dts generator error", e);
        } finally {
            scheduleRecordGenerator();
        }
    }

    public void afterPropertiesSet() {
        schedule();
    }

    public void schedule() {
        this.recordProcessor.registerRecordListener(this.handleRecordListener.getClass().getSimpleName(), this.handleRecordListener);
        scheduleRecordGenerator();
        scheduleRecordProcessor();
    }

    public void scheduleRecordGenerator() {
        executorService.schedule(this::runRecordGenerator, 1L, TimeUnit.SECONDS);
    }

    public void scheduleRecordProcessor() {
        executorService.schedule(this::runRecordProcessor, 1L, TimeUnit.SECONDS);
    }

    public DtsServiceImpl(RecordGenerator recordGenerator, EtlRecordProcessor etlRecordProcessor, RecordListener recordListener) {
        this.recordGenerator = recordGenerator;
        this.recordProcessor = etlRecordProcessor;
        this.handleRecordListener = recordListener;
    }
}
