package io.debezium.connector.postgresql.connection;

import io.debezium.config.Configuration;
import io.debezium.connector.postgresql.PostgresConnectorConfig;
import io.debezium.connector.postgresql.connection.ReplicationConnection;
import io.debezium.connector.postgresql.connection.ReplicationStream;
import io.debezium.connector.postgresql.connection.ServerInfo;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.jdbc.JdbcConnectionException;
import java.nio.ByteBuffer;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.SQLWarning;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import org.postgresql.PGConnection;
import org.postgresql.replication.LogSequenceNumber;
import org.postgresql.replication.PGReplicationStream;
import org.postgresql.replication.fluent.logical.ChainedLogicalStreamBuilder;
import org.postgresql.util.PSQLException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/debezium/connector/postgresql/connection/PostgresReplicationConnection.class */
public class PostgresReplicationConnection extends JdbcConnection implements ReplicationConnection {
    private static Logger LOGGER;
    private final String slotName;
    private final PostgresConnectorConfig.LogicalDecoder plugin;
    private final boolean dropSlotOnClose;
    private final Configuration originalConfig;
    private final Integer statusUpdateIntervalMillis;
    private final MessageDecoder messageDecoder;
    private long defaultStartingPos;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* loaded from: input_file:io/debezium/connector/postgresql/connection/PostgresReplicationConnection$ReplicationConnectionBuilder.class */
    protected static class ReplicationConnectionBuilder implements ReplicationConnection.Builder {
        private final Configuration config;
        private String slotName = ReplicationConnection.Builder.DEFAULT_SLOT_NAME;
        private PostgresConnectorConfig.LogicalDecoder plugin = PostgresConnectorConfig.LogicalDecoder.DECODERBUFS;
        private boolean dropSlotOnClose = true;
        private Integer statusUpdateIntervalMillis;
        static final /* synthetic */ boolean $assertionsDisabled;

        /* JADX INFO: Access modifiers changed from: protected */
        public ReplicationConnectionBuilder(Configuration configuration) {
            if (!$assertionsDisabled && configuration == null) {
                throw new AssertionError();
            }
            this.config = configuration;
        }

        @Override // io.debezium.connector.postgresql.connection.ReplicationConnection.Builder
        public ReplicationConnectionBuilder withSlot(String str) {
            if (!$assertionsDisabled && str == null) {
                throw new AssertionError();
            }
            this.slotName = str;
            return this;
        }

        @Override // io.debezium.connector.postgresql.connection.ReplicationConnection.Builder
        public ReplicationConnectionBuilder withPlugin(PostgresConnectorConfig.LogicalDecoder logicalDecoder) {
            if (!$assertionsDisabled && logicalDecoder == null) {
                throw new AssertionError();
            }
            this.plugin = logicalDecoder;
            return this;
        }

        @Override // io.debezium.connector.postgresql.connection.ReplicationConnection.Builder
        public ReplicationConnectionBuilder dropSlotOnClose(boolean z) {
            this.dropSlotOnClose = z;
            return this;
        }

        @Override // io.debezium.connector.postgresql.connection.ReplicationConnection.Builder
        public ReplicationConnectionBuilder statusUpdateIntervalMillis(Integer num) {
            this.statusUpdateIntervalMillis = num;
            return this;
        }

        @Override // io.debezium.connector.postgresql.connection.ReplicationConnection.Builder
        public ReplicationConnection build() {
            if ($assertionsDisabled || this.plugin != null) {
                return new PostgresReplicationConnection(this.config, this.slotName, this.plugin, this.dropSlotOnClose, this.statusUpdateIntervalMillis);
            }
            throw new AssertionError("Decoding plugin name is not set");
        }

        static {
            $assertionsDisabled = !PostgresReplicationConnection.class.desiredAssertionStatus();
        }
    }

    private PostgresReplicationConnection(Configuration configuration, String str, PostgresConnectorConfig.LogicalDecoder logicalDecoder, boolean z, Integer num) {
        super(configuration, PostgresConnection.FACTORY, (JdbcConnection.Operations) null, PostgresReplicationConnection::defaultSettings);
        this.originalConfig = configuration;
        this.slotName = str;
        this.plugin = logicalDecoder;
        this.dropSlotOnClose = z;
        this.statusUpdateIntervalMillis = num;
        this.messageDecoder = logicalDecoder.messageDecoder();
        try {
            initReplicationSlot();
        } catch (SQLException e) {
            throw new JdbcConnectionException("Cannot create replication connection", e);
        }
    }

    protected void initReplicationSlot() throws SQLException {
        PostgresConnection postgresConnection = new PostgresConnection(this.originalConfig);
        Throwable th = null;
        try {
            try {
                ServerInfo.ReplicationSlot readReplicationSlotInfo = postgresConnection.readReplicationSlotInfo(this.slotName, this.plugin.getValue());
                if (postgresConnection != null) {
                    if (0 != 0) {
                        try {
                            postgresConnection.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        postgresConnection.close();
                    }
                }
                boolean z = ServerInfo.ReplicationSlot.INVALID == readReplicationSlotInfo;
                try {
                    if (z) {
                        LOGGER.debug("Creating new replication slot '{}' for plugin '{}'", this.slotName, this.plugin);
                        pgConnection().getReplicationAPI().createReplicationSlot().logical().withSlotName(this.slotName).withOutputPlugin(this.plugin.getValue()).make();
                    } else if (readReplicationSlotInfo.active()) {
                        LOGGER.error("A logical replication slot named '{}' for plugin '{}' and database '{}' is already active on the server.You cannot have multiple slots with the same name active for the same database", new Object[]{this.slotName, this.plugin.getValue(), database()});
                        throw new IllegalStateException();
                    }
                    AtomicLong atomicLong = new AtomicLong();
                    execute(statement -> {
                        LOGGER.debug("running '{}' to validate replication connection", "IDENTIFY_SYSTEM");
                        ResultSet executeQuery = statement.executeQuery("IDENTIFY_SYSTEM");
                        Throwable th3 = null;
                        try {
                            if (!executeQuery.next()) {
                                throw new IllegalStateException("The DB connection is not a valid replication connection");
                            }
                            String string = executeQuery.getString("xlogpos");
                            LOGGER.debug("received latest xlogpos '{}'", string);
                            atomicLong.compareAndSet(0L, LogSequenceNumber.valueOf(string).asLong());
                            if (executeQuery != null) {
                                if (0 == 0) {
                                    executeQuery.close();
                                    return;
                                }
                                try {
                                    executeQuery.close();
                                } catch (Throwable th4) {
                                    th3.addSuppressed(th4);
                                }
                            }
                        } catch (Throwable th5) {
                            if (executeQuery != null) {
                                if (0 != 0) {
                                    try {
                                        executeQuery.close();
                                    } catch (Throwable th6) {
                                        th3.addSuppressed(th6);
                                    }
                                } else {
                                    executeQuery.close();
                                }
                            }
                            throw th5;
                        }
                    });
                    if (z || !readReplicationSlotInfo.hasValidFlushedLSN()) {
                        this.defaultStartingPos = atomicLong.get();
                    } else {
                        Long latestFlushedLSN = readReplicationSlotInfo.latestFlushedLSN();
                        this.defaultStartingPos = latestFlushedLSN.longValue() < atomicLong.get() ? latestFlushedLSN.longValue() : atomicLong.get();
                        if (LOGGER.isDebugEnabled()) {
                            LOGGER.debug("found previous flushed LSN '{}'", ReplicationConnection.format(latestFlushedLSN.longValue()));
                        }
                    }
                } catch (SQLException e) {
                    throw new JdbcConnectionException(e);
                }
            } finally {
            }
        } catch (Throwable th3) {
            if (postgresConnection != null) {
                if (th != null) {
                    try {
                        postgresConnection.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    postgresConnection.close();
                }
            }
            throw th3;
        }
    }

    @Override // io.debezium.connector.postgresql.connection.ReplicationConnection
    public ReplicationStream startStreaming() throws SQLException {
        return startStreaming(Long.valueOf(this.defaultStartingPos));
    }

    @Override // io.debezium.connector.postgresql.connection.ReplicationConnection
    public ReplicationStream startStreaming(Long l) throws SQLException {
        connect();
        if (l == null || l.longValue() <= 0) {
            l = Long.valueOf(this.defaultStartingPos);
        }
        LogSequenceNumber valueOf = LogSequenceNumber.valueOf(l.longValue());
        LOGGER.debug("starting streaming from LSN '{}'", valueOf.asString());
        return createReplicationStream(valueOf);
    }

    protected PGConnection pgConnection() throws SQLException {
        return connection();
    }

    private ReplicationStream createReplicationStream(LogSequenceNumber logSequenceNumber) throws SQLException {
        PGReplicationStream startPgReplicationStream;
        try {
            MessageDecoder messageDecoder = this.messageDecoder;
            messageDecoder.getClass();
            startPgReplicationStream = startPgReplicationStream(logSequenceNumber, messageDecoder::optionsWithMetadata);
            this.messageDecoder.setContainsMetadata(true);
        } catch (PSQLException e) {
            if (!e.getMessage().matches("(?s)ERROR: option .* is unknown.*")) {
                throw e;
            }
            LOGGER.warn("Could not register for streaming with metadata in messages, falling back to messages without metadata");
            MessageDecoder messageDecoder2 = this.messageDecoder;
            messageDecoder2.getClass();
            startPgReplicationStream = startPgReplicationStream(logSequenceNumber, messageDecoder2::optionsWithoutMetadata);
            this.messageDecoder.setContainsMetadata(false);
        }
        final PGReplicationStream pGReplicationStream = startPgReplicationStream;
        final long asLong = logSequenceNumber.asLong();
        return new ReplicationStream() { // from class: io.debezium.connector.postgresql.connection.PostgresReplicationConnection.1
            private static final int CHECK_WARNINGS_AFTER_COUNT = 100;
            private int warningCheckCounter = CHECK_WARNINGS_AFTER_COUNT;
            private volatile LogSequenceNumber lastReceivedLSN;

            @Override // io.debezium.connector.postgresql.connection.ReplicationStream
            public void read(ReplicationStream.ReplicationMessageProcessor replicationMessageProcessor) throws SQLException {
                ByteBuffer read = pGReplicationStream.read();
                if (asLong >= pGReplicationStream.getLastReceiveLSN().asLong()) {
                    return;
                }
                deserializeMessages(read, replicationMessageProcessor);
            }

            @Override // io.debezium.connector.postgresql.connection.ReplicationStream
            public void readPending(ReplicationStream.ReplicationMessageProcessor replicationMessageProcessor) throws SQLException {
                ByteBuffer readPending = pGReplicationStream.readPending();
                if (readPending == null || asLong >= pGReplicationStream.getLastReceiveLSN().asLong()) {
                    return;
                }
                deserializeMessages(readPending, replicationMessageProcessor);
            }

            private void deserializeMessages(ByteBuffer byteBuffer, ReplicationStream.ReplicationMessageProcessor replicationMessageProcessor) throws SQLException {
                this.lastReceivedLSN = pGReplicationStream.getLastReceiveLSN();
                PostgresReplicationConnection.this.messageDecoder.processMessage(byteBuffer, replicationMessageProcessor);
            }

            @Override // io.debezium.connector.postgresql.connection.ReplicationStream, java.lang.AutoCloseable
            public void close() throws SQLException {
                processWarnings(true);
                pGReplicationStream.close();
            }

            @Override // io.debezium.connector.postgresql.connection.ReplicationStream
            public void flushLSN() throws SQLException {
                if (this.lastReceivedLSN == null) {
                    return;
                }
                pGReplicationStream.setFlushedLSN(this.lastReceivedLSN);
                pGReplicationStream.setAppliedLSN(this.lastReceivedLSN);
                pGReplicationStream.forceUpdateStatus();
            }

            @Override // io.debezium.connector.postgresql.connection.ReplicationStream
            public Long lastReceivedLSN() {
                if (this.lastReceivedLSN != null) {
                    return Long.valueOf(this.lastReceivedLSN.asLong());
                }
                return null;
            }

            private void processWarnings(boolean z) throws SQLException {
                int i = this.warningCheckCounter - 1;
                this.warningCheckCounter = i;
                if (i != 0 && !z) {
                    return;
                }
                this.warningCheckCounter = CHECK_WARNINGS_AFTER_COUNT;
                SQLWarning warnings = PostgresReplicationConnection.this.connection().getWarnings();
                while (true) {
                    SQLWarning sQLWarning = warnings;
                    if (sQLWarning == null) {
                        return;
                    }
                    PostgresReplicationConnection.LOGGER.debug("Server-side message: '{}', state = {}, code = {}", new Object[]{sQLWarning.getMessage(), sQLWarning.getSQLState(), Integer.valueOf(sQLWarning.getErrorCode())});
                    warnings = sQLWarning.getNextWarning();
                }
            }
        };
    }

    private PGReplicationStream startPgReplicationStream(LogSequenceNumber logSequenceNumber, Function<ChainedLogicalStreamBuilder, ChainedLogicalStreamBuilder> function) throws SQLException {
        if (!$assertionsDisabled && logSequenceNumber == null) {
            throw new AssertionError();
        }
        ChainedLogicalStreamBuilder apply = function.apply((ChainedLogicalStreamBuilder) pgConnection().getReplicationAPI().replicationStream().logical().withSlotName(this.slotName).withStartPosition(logSequenceNumber));
        if (this.statusUpdateIntervalMillis != null && this.statusUpdateIntervalMillis.intValue() > 0) {
            apply.withStatusInterval(this.statusUpdateIntervalMillis.intValue(), TimeUnit.MILLISECONDS);
        }
        PGReplicationStream start = apply.start();
        try {
            Thread.sleep(10L);
        } catch (Exception e) {
        }
        start.forceUpdateStatus();
        return start;
    }

    @Override // java.lang.AutoCloseable
    public synchronized void close() {
        try {
            super.close();
        } catch (SQLException e) {
            LOGGER.error("Unexpected error while closing Postgres connection", e);
        }
        if (this.dropSlotOnClose) {
            PostgresConnection postgresConnection = new PostgresConnection(this.originalConfig);
            Throwable th = null;
            try {
                postgresConnection.dropReplicationSlot(this.slotName);
                if (postgresConnection != null) {
                    if (0 == 0) {
                        postgresConnection.close();
                        return;
                    }
                    try {
                        postgresConnection.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                if (postgresConnection != null) {
                    if (0 != 0) {
                        try {
                            postgresConnection.close();
                        } catch (Throwable th4) {
                            th.addSuppressed(th4);
                        }
                    } else {
                        postgresConnection.close();
                    }
                }
                throw th3;
            }
        }
    }

    protected static void defaultSettings(Configuration.Builder builder) {
        PostgresConnection.defaultSettings(builder);
        builder.with("replication", "database").with("preferQueryMode", "simple");
    }

    static {
        $assertionsDisabled = !PostgresReplicationConnection.class.desiredAssertionStatus();
        LOGGER = LoggerFactory.getLogger(PostgresReplicationConnection.class);
    }
}
