package org.apache.shardingsphere.shardingscaling.postgresql;

import java.nio.ByteBuffer;
import java.sql.Connection;
import java.sql.SQLException;
import lombok.Generated;
import org.apache.shardingsphere.shardingscaling.core.config.JDBCDataSourceConfiguration;
import org.apache.shardingsphere.shardingscaling.core.config.RdbmsConfiguration;
import org.apache.shardingsphere.shardingscaling.core.exception.SyncTaskExecuteException;
import org.apache.shardingsphere.shardingscaling.core.execute.executor.AbstractSyncExecutor;
import org.apache.shardingsphere.shardingscaling.core.execute.executor.channel.Channel;
import org.apache.shardingsphere.shardingscaling.core.execute.executor.position.LogPosition;
import org.apache.shardingsphere.shardingscaling.core.execute.executor.reader.LogReader;
import org.apache.shardingsphere.shardingscaling.core.execute.executor.record.Record;
import org.apache.shardingsphere.shardingscaling.postgresql.wal.LogicalReplication;
import org.apache.shardingsphere.shardingscaling.postgresql.wal.WalEventConverter;
import org.apache.shardingsphere.shardingscaling.postgresql.wal.decode.DecodingPlugin;
import org.apache.shardingsphere.shardingscaling.postgresql.wal.decode.TestDecodingPlugin;
import org.postgresql.PGConnection;
import org.postgresql.jdbc.PgConnection;
import org.postgresql.replication.PGReplicationStream;

/* loaded from: input_file:org/apache/shardingsphere/shardingscaling/postgresql/PostgreSQLWalReader.class */
public final class PostgreSQLWalReader extends AbstractSyncExecutor implements LogReader {
    private final WalPosition walPosition;
    private final RdbmsConfiguration rdbmsConfiguration;
    private DecodingPlugin decodingPlugin;
    private final LogicalReplication logicalReplication = new LogicalReplication();
    private final WalEventConverter walEventConverter;
    private Channel channel;

    public PostgreSQLWalReader(RdbmsConfiguration rdbmsConfiguration, LogPosition logPosition) {
        this.walPosition = (WalPosition) logPosition;
        if (!JDBCDataSourceConfiguration.class.equals(rdbmsConfiguration.getDataSourceConfiguration().getClass())) {
            throw new UnsupportedOperationException("PostgreSQLWalReader only support JDBCDataSourceConfiguration");
        }
        this.rdbmsConfiguration = rdbmsConfiguration;
        this.walEventConverter = new WalEventConverter(rdbmsConfiguration);
    }

    public void run() {
        start();
        read(this.channel);
    }

    public void read(Channel channel) {
        try {
            PGConnection createPgConnection = this.logicalReplication.createPgConnection((JDBCDataSourceConfiguration) this.rdbmsConfiguration.getDataSourceConfiguration());
            this.decodingPlugin = new TestDecodingPlugin(((PgConnection) ((Connection) createPgConnection).unwrap(PgConnection.class)).getTimestampUtils());
            PGReplicationStream createReplicationStream = this.logicalReplication.createReplicationStream(createPgConnection, PostgreSQLLogPositionManager.SLOT_NAME, this.walPosition.getLogSequenceNumber());
            while (isRunning()) {
                ByteBuffer readPending = createReplicationStream.readPending();
                if (readPending == null) {
                    try {
                        Thread.sleep(10L);
                    } catch (InterruptedException e) {
                    }
                }
                pushRecord(channel, this.walEventConverter.convert(this.decodingPlugin.decode(readPending, createReplicationStream.getLastReceiveLSN())));
            }
        } catch (SQLException e2) {
            throw new SyncTaskExecuteException(e2);
        }
    }

    private void pushRecord(Channel channel, Record record) {
        try {
            channel.pushRecord(record);
        } catch (InterruptedException e) {
        }
    }

    @Generated
    public void setChannel(Channel channel) {
        this.channel = channel;
    }
}
