package io.debezium.connector.postgresql.connection;

import io.debezium.DebeziumException;
import io.debezium.config.CommonConnectorConfig;
import io.debezium.connector.postgresql.PostgresConnectorConfig;
import io.debezium.connector.postgresql.PostgresSchema;
import io.debezium.connector.postgresql.TypeRegistry;
import io.debezium.connector.postgresql.connection.ReplicationConnection;
import io.debezium.connector.postgresql.connection.ReplicationStream;
import io.debezium.connector.postgresql.connection.ServerInfo;
import io.debezium.connector.postgresql.spi.SlotCreationResult;
import io.debezium.jdbc.JdbcConfiguration;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.jdbc.JdbcConnectionException;
import io.debezium.relational.RelationalTableFilters;
import io.debezium.relational.TableId;
import io.debezium.util.Clock;
import io.debezium.util.Metronome;
import java.nio.ByteBuffer;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.SQLWarning;
import java.sql.Statement;
import java.time.Duration;
import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.kafka.connect.errors.ConnectException;
import org.postgresql.core.BaseConnection;
import org.postgresql.core.ServerVersion;
import org.postgresql.replication.PGReplicationStream;
import org.postgresql.replication.fluent.logical.ChainedLogicalStreamBuilder;
import org.postgresql.util.PSQLException;
import org.postgresql.util.PSQLState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/debezium/connector/postgresql/connection/PostgresReplicationConnection.class */
public class PostgresReplicationConnection extends JdbcConnection implements ReplicationConnection {
    private static final String SQL_STATE_INSUFFICIENT_PRIVILEGE = "42501";
    private static Logger LOGGER;
    private final String slotName;
    private final String publicationName;
    private final RelationalTableFilters tableFilter;
    private final PostgresConnectorConfig.AutoCreateMode publicationAutocreateMode;
    private final PostgresConnectorConfig.LogicalDecoder plugin;
    private final boolean dropSlotOnClose;
    private final PostgresConnectorConfig connectorConfig;
    private final Duration statusUpdateInterval;
    private final MessageDecoder messageDecoder;
    private final PostgresConnection jdbcConnection;
    private final TypeRegistry typeRegistry;
    private final Properties streamParams;
    private Lsn defaultStartingPos;
    private SlotCreationResult slotCreationInfo;
    private boolean hasInitedSlot;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: io.debezium.connector.postgresql.connection.PostgresReplicationConnection$2, reason: invalid class name */
    /* loaded from: input_file:io/debezium/connector/postgresql/connection/PostgresReplicationConnection$2.class */
    public static /* synthetic */ class AnonymousClass2 {
        static final /* synthetic */ int[] $SwitchMap$io$debezium$config$CommonConnectorConfig$EventProcessingFailureHandlingMode = new int[CommonConnectorConfig.EventProcessingFailureHandlingMode.values().length];

        static {
            try {
                $SwitchMap$io$debezium$config$CommonConnectorConfig$EventProcessingFailureHandlingMode[CommonConnectorConfig.EventProcessingFailureHandlingMode.FAIL.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$io$debezium$config$CommonConnectorConfig$EventProcessingFailureHandlingMode[CommonConnectorConfig.EventProcessingFailureHandlingMode.WARN.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$io$debezium$config$CommonConnectorConfig$EventProcessingFailureHandlingMode[CommonConnectorConfig.EventProcessingFailureHandlingMode.SKIP.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$io$debezium$config$CommonConnectorConfig$EventProcessingFailureHandlingMode[CommonConnectorConfig.EventProcessingFailureHandlingMode.IGNORE.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
            $SwitchMap$io$debezium$connector$postgresql$PostgresConnectorConfig$AutoCreateMode = new int[PostgresConnectorConfig.AutoCreateMode.values().length];
            try {
                $SwitchMap$io$debezium$connector$postgresql$PostgresConnectorConfig$AutoCreateMode[PostgresConnectorConfig.AutoCreateMode.DISABLED.ordinal()] = 1;
            } catch (NoSuchFieldError e5) {
            }
            try {
                $SwitchMap$io$debezium$connector$postgresql$PostgresConnectorConfig$AutoCreateMode[PostgresConnectorConfig.AutoCreateMode.ALL_TABLES.ordinal()] = 2;
            } catch (NoSuchFieldError e6) {
            }
            try {
                $SwitchMap$io$debezium$connector$postgresql$PostgresConnectorConfig$AutoCreateMode[PostgresConnectorConfig.AutoCreateMode.FILTERED.ordinal()] = 3;
            } catch (NoSuchFieldError e7) {
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:io/debezium/connector/postgresql/connection/PostgresReplicationConnection$ReplicationConnectionBuilder.class */
    public static class ReplicationConnectionBuilder implements ReplicationConnection.Builder {
        private final PostgresConnectorConfig config;
        private RelationalTableFilters tableFilter;
        private Duration statusUpdateIntervalVal;
        private TypeRegistry typeRegistry;
        private PostgresSchema schema;
        private PostgresConnection jdbcConnection;
        static final /* synthetic */ boolean $assertionsDisabled;
        private String slotName = ReplicationConnection.Builder.DEFAULT_SLOT_NAME;
        private String publicationName = ReplicationConnection.Builder.DEFAULT_PUBLICATION_NAME;
        private PostgresConnectorConfig.AutoCreateMode publicationAutocreateMode = PostgresConnectorConfig.AutoCreateMode.ALL_TABLES;
        private PostgresConnectorConfig.LogicalDecoder plugin = PostgresConnectorConfig.LogicalDecoder.DECODERBUFS;
        private boolean dropSlotOnClose = true;
        private Properties slotStreamParams = new Properties();

        /* JADX INFO: Access modifiers changed from: protected */
        public ReplicationConnectionBuilder(PostgresConnectorConfig postgresConnectorConfig) {
            if (!$assertionsDisabled && postgresConnectorConfig == null) {
                throw new AssertionError();
            }
            this.config = postgresConnectorConfig;
        }

        @Override // io.debezium.connector.postgresql.connection.ReplicationConnection.Builder
        public ReplicationConnectionBuilder withSlot(String str) {
            if (!$assertionsDisabled && str == null) {
                throw new AssertionError();
            }
            this.slotName = str;
            return this;
        }

        @Override // io.debezium.connector.postgresql.connection.ReplicationConnection.Builder
        public ReplicationConnection.Builder withPublication(String str) {
            if (!$assertionsDisabled && str == null) {
                throw new AssertionError();
            }
            this.publicationName = str;
            return this;
        }

        @Override // io.debezium.connector.postgresql.connection.ReplicationConnection.Builder
        public ReplicationConnection.Builder withTableFilter(RelationalTableFilters relationalTableFilters) {
            if (!$assertionsDisabled && relationalTableFilters == null) {
                throw new AssertionError();
            }
            this.tableFilter = relationalTableFilters;
            return this;
        }

        @Override // io.debezium.connector.postgresql.connection.ReplicationConnection.Builder
        public ReplicationConnection.Builder withPublicationAutocreateMode(PostgresConnectorConfig.AutoCreateMode autoCreateMode) {
            if (!$assertionsDisabled && this.publicationName == null) {
                throw new AssertionError();
            }
            this.publicationAutocreateMode = autoCreateMode;
            return this;
        }

        @Override // io.debezium.connector.postgresql.connection.ReplicationConnection.Builder
        public ReplicationConnectionBuilder withPlugin(PostgresConnectorConfig.LogicalDecoder logicalDecoder) {
            if (!$assertionsDisabled && logicalDecoder == null) {
                throw new AssertionError();
            }
            this.plugin = logicalDecoder;
            return this;
        }

        @Override // io.debezium.connector.postgresql.connection.ReplicationConnection.Builder
        public ReplicationConnectionBuilder dropSlotOnClose(boolean z) {
            this.dropSlotOnClose = z;
            return this;
        }

        @Override // io.debezium.connector.postgresql.connection.ReplicationConnection.Builder
        public ReplicationConnectionBuilder streamParams(String str) {
            if (str != null && !str.isEmpty()) {
                this.slotStreamParams = new Properties();
                for (String str2 : str.split(";")) {
                    String[] split = str2.split("=");
                    if (split.length == 2) {
                        this.slotStreamParams.setProperty(split[0], split[1]);
                    } else {
                        PostgresReplicationConnection.LOGGER.warn("The following STREAM_PARAMS value is invalid: {}", str2);
                    }
                }
            }
            return this;
        }

        @Override // io.debezium.connector.postgresql.connection.ReplicationConnection.Builder
        public ReplicationConnectionBuilder statusUpdateInterval(Duration duration) {
            this.statusUpdateIntervalVal = duration;
            return this;
        }

        @Override // io.debezium.connector.postgresql.connection.ReplicationConnection.Builder
        public ReplicationConnection.Builder jdbcMetadataConnection(PostgresConnection postgresConnection) {
            this.jdbcConnection = postgresConnection;
            return this;
        }

        @Override // io.debezium.connector.postgresql.connection.ReplicationConnection.Builder
        public ReplicationConnection build() {
            if ($assertionsDisabled || this.plugin != null) {
                return new PostgresReplicationConnection(this.config, this.slotName, this.publicationName, this.tableFilter, this.publicationAutocreateMode, this.plugin, this.dropSlotOnClose, this.statusUpdateIntervalVal, this.jdbcConnection, this.typeRegistry, this.slotStreamParams, this.schema);
            }
            throw new AssertionError("Decoding plugin name is not set");
        }

        @Override // io.debezium.connector.postgresql.connection.ReplicationConnection.Builder
        public ReplicationConnection.Builder withTypeRegistry(TypeRegistry typeRegistry) {
            this.typeRegistry = typeRegistry;
            return this;
        }

        @Override // io.debezium.connector.postgresql.connection.ReplicationConnection.Builder
        public ReplicationConnection.Builder withSchema(PostgresSchema postgresSchema) {
            this.schema = postgresSchema;
            return this;
        }

        static {
            $assertionsDisabled = !PostgresReplicationConnection.class.desiredAssertionStatus();
        }
    }

    private PostgresReplicationConnection(PostgresConnectorConfig postgresConnectorConfig, String str, String str2, RelationalTableFilters relationalTableFilters, PostgresConnectorConfig.AutoCreateMode autoCreateMode, PostgresConnectorConfig.LogicalDecoder logicalDecoder, boolean z, Duration duration, PostgresConnection postgresConnection, TypeRegistry typeRegistry, Properties properties, PostgresSchema postgresSchema) {
        super(addDefaultSettings(postgresConnectorConfig.getJdbcConfig()), PostgresConnection.FACTORY, "\"", "\"");
        this.connectorConfig = postgresConnectorConfig;
        this.slotName = str;
        this.publicationName = str2;
        this.tableFilter = relationalTableFilters;
        this.publicationAutocreateMode = autoCreateMode;
        this.plugin = logicalDecoder;
        this.dropSlotOnClose = z;
        this.statusUpdateInterval = duration;
        this.messageDecoder = logicalDecoder.messageDecoder(new MessageDecoderContext(postgresConnectorConfig, postgresSchema), postgresConnection);
        this.jdbcConnection = postgresConnection;
        this.typeRegistry = typeRegistry;
        this.streamParams = properties;
        this.slotCreationInfo = null;
        this.hasInitedSlot = false;
    }

    private static JdbcConfiguration addDefaultSettings(JdbcConfiguration jdbcConfiguration) {
        return JdbcConfiguration.adapt(PostgresConnection.addDefaultSettings(jdbcConfiguration, PostgresConnection.CONNECTION_STREAMING).edit().with("replication", "database").with("preferQueryMode", "simple").build());
    }

    private ServerInfo.ReplicationSlot getSlotInfo() throws SQLException, InterruptedException {
        PostgresConnection postgresConnection = new PostgresConnection(this.connectorConfig.getJdbcConfig(), PostgresConnection.CONNECTION_SLOT_INFO);
        try {
            ServerInfo.ReplicationSlot readReplicationSlotInfo = postgresConnection.readReplicationSlotInfo(this.slotName, this.plugin.getPostgresPluginName());
            postgresConnection.close();
            return readReplicationSlotInfo;
        } catch (Throwable th) {
            try {
                postgresConnection.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    /* JADX WARN: Can't fix incorrect switch cases order, some code will duplicate */
    /* JADX WARN: Failed to find 'out' block for switch in B:14:0x0088. Please report as an issue. */
    protected void initPublication() {
        if (PostgresConnectorConfig.LogicalDecoder.PGOUTPUT.equals(this.plugin)) {
            LOGGER.info("Initializing PgOutput logical decoder publication");
            try {
                BaseConnection pgConnection = pgConnection();
                pgConnection.setAutoCommit(false);
                String format = String.format("SELECT COUNT(1) FROM pg_publication WHERE pubname = '%s'", this.publicationName);
                Statement createStatement = pgConnection.createStatement();
                try {
                    ResultSet executeQuery = createStatement.executeQuery(format);
                    try {
                        if (executeQuery.next()) {
                            if (Long.valueOf(executeQuery.getLong(1)).longValue() != 0) {
                                switch (this.publicationAutocreateMode) {
                                    case FILTERED:
                                        createOrUpdatePublicationModeFilterted(null, createStatement, true);
                                        break;
                                    default:
                                        LOGGER.trace("A logical publication named '{}' for plugin '{}' and database '{}' is already active on the server and will be used by the plugin", new Object[]{this.publicationName, this.plugin, database()});
                                        break;
                                }
                            } else {
                                LOGGER.info("Creating new publication '{}' for plugin '{}'", this.publicationName, this.plugin);
                                switch (this.publicationAutocreateMode) {
                                    case DISABLED:
                                        throw new ConnectException("Publication autocreation is disabled, please create one and restart the connector.");
                                    case ALL_TABLES:
                                        String format2 = String.format("CREATE PUBLICATION %s FOR ALL TABLES;", this.publicationName);
                                        LOGGER.info("Creating Publication with statement '{}'", format2);
                                        createStatement.execute(format2);
                                        break;
                                    case FILTERED:
                                        createOrUpdatePublicationModeFilterted(null, createStatement, false);
                                        break;
                                }
                            }
                        }
                        if (executeQuery != null) {
                            executeQuery.close();
                        }
                        if (createStatement != null) {
                            createStatement.close();
                        }
                        pgConnection.commit();
                        pgConnection.setAutoCommit(true);
                    } catch (Throwable th) {
                        if (executeQuery != null) {
                            try {
                                executeQuery.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        }
                        throw th;
                    }
                } finally {
                }
            } catch (SQLException e) {
                throw new JdbcConnectionException(e);
            }
        }
    }

    private void createOrUpdatePublicationModeFilterted(String str, Statement statement, boolean z) {
        try {
            String str2 = (String) determineCapturedTables().stream().map((v0) -> {
                return v0.toDoubleQuotedString();
            }).collect(Collectors.joining(", "));
            if (str2.isEmpty()) {
                throw new DebeziumException(String.format("No table filters found for filtered publication %s", this.publicationName));
            }
            String format = z ? String.format("ALTER PUBLICATION %s SET TABLE %s;", this.publicationName, str2) : String.format("CREATE PUBLICATION %s FOR TABLE %s;", this.publicationName, str2);
            LOGGER.info(z ? "Updating Publication with statement '{}'" : "Creating Publication with statement '{}'", format);
            statement.execute(format);
        } catch (Exception e) {
            Object[] objArr = new Object[3];
            objArr[0] = z ? "update" : "create";
            objArr[1] = this.publicationName;
            objArr[2] = str;
            throw new ConnectException(String.format("Unable to %s filtered publication %s for %s", objArr), e);
        }
    }

    private Set<TableId> determineCapturedTables() throws Exception {
        Set<TableId> allTableIds = this.jdbcConnection.getAllTableIds(this.connectorConfig.databaseName());
        HashSet hashSet = new HashSet();
        for (TableId tableId : allTableIds) {
            if (this.tableFilter.dataCollectionFilter().isIncluded(tableId)) {
                LOGGER.trace("Adding table {} to the list of captured tables", tableId);
                hashSet.add(tableId);
            } else {
                LOGGER.trace("Ignoring table {} as it's not included in the filter configuration", tableId);
            }
        }
        return (Set) hashSet.stream().sorted().collect(Collectors.toCollection(LinkedHashSet::new));
    }

    protected void initReplicationSlot() throws SQLException, InterruptedException {
        ServerInfo.ReplicationSlot slotInfo = getSlotInfo();
        boolean z = ServerInfo.ReplicationSlot.INVALID == slotInfo;
        if (z) {
            try {
                createReplicationSlot();
            } catch (SQLException e) {
                throw new JdbcConnectionException(e);
            }
        }
        pgConnection();
        LOGGER.debug("running '{}' to validate replication connection", "IDENTIFY_SYSTEM");
        Lsn lsn = (Lsn) queryAndMap("IDENTIFY_SYSTEM", resultSet -> {
            if (!resultSet.next()) {
                throw new IllegalStateException("The DB connection is not a valid replication connection");
            }
            String string = resultSet.getString("xlogpos");
            LOGGER.debug("received latest xlogpos '{}'", string);
            return Lsn.valueOf(string);
        });
        if (this.slotCreationInfo != null) {
            this.defaultStartingPos = this.slotCreationInfo.startLsn();
        } else if (z || !slotInfo.hasValidFlushedLsn()) {
            this.defaultStartingPos = lsn;
        } else {
            Lsn latestFlushedLsn = slotInfo.latestFlushedLsn();
            this.defaultStartingPos = latestFlushedLsn.compareTo(lsn) < 0 ? latestFlushedLsn : lsn;
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("found previous flushed LSN '{}'", latestFlushedLsn);
            }
        }
        this.hasInitedSlot = true;
    }

    private boolean useTemporarySlot() throws SQLException {
        return false;
    }

    @Override // io.debezium.connector.postgresql.connection.ReplicationConnection
    public ReplicationStream startStreaming(WalPositionLocator walPositionLocator) throws SQLException, InterruptedException {
        return startStreaming(null, walPositionLocator);
    }

    @Override // io.debezium.connector.postgresql.connection.ReplicationConnection
    public ReplicationStream startStreaming(Lsn lsn, WalPositionLocator walPositionLocator) throws SQLException, InterruptedException {
        initConnection();
        connect();
        if (lsn == null || !lsn.isValid()) {
            lsn = this.defaultStartingPos;
        }
        Lsn lsn2 = lsn;
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("starting streaming from LSN '{}'", lsn2);
        }
        int maxRetries = this.connectorConfig.maxRetries();
        Duration retryDelay = this.connectorConfig.retryDelay();
        int i = 0;
        while (true) {
            try {
                validateSlotIsInExpectedState(walPositionLocator);
                return createReplicationStream(lsn2, walPositionLocator);
            } catch (Exception e) {
                String str = "Failed to start replication stream at " + lsn2;
                i++;
                if (i > maxRetries) {
                    if (e.getMessage().matches(".*replication slot .* is active.*")) {
                        str = str + "; when setting up multiple connectors for the same database host, please make sure to use a distinct replication slot name for each.";
                    }
                    throw new DebeziumException(str, e);
                }
                LOGGER.warn(str + ", waiting for {} ms and retrying, attempt number {} over {}", new Object[]{retryDelay, Integer.valueOf(i), Integer.valueOf(maxRetries)});
                Metronome.sleeper(retryDelay, Clock.SYSTEM).pause();
            }
        }
    }

    protected void validateSlotIsInExpectedState(WalPositionLocator walPositionLocator) throws SQLException {
        Lsn lastCommitStoredLsn = walPositionLocator.getLastCommitStoredLsn() != null ? walPositionLocator.getLastCommitStoredLsn() : walPositionLocator.getLastEventStoredLsn();
        if (lastCommitStoredLsn == null || !this.connectorConfig.isFlushLsnOnSource()) {
            return;
        }
        try {
            Statement createStatement = pgConnection().createStatement();
            try {
                String format = String.format("SELECT pg_replication_slot_advance('%s', '%s')", this.slotName, lastCommitStoredLsn.asString());
                LOGGER.info("Seeking to {} on the replication slot with command {}", lastCommitStoredLsn, format);
                createStatement.execute(format);
                if (createStatement != null) {
                    createStatement.close();
                }
            } finally {
            }
        } catch (PSQLException e) {
            if (e.getMessage().matches("ERROR: function pg_replication_slot_advance.*does not exist(.|\\n)*") || PSQLState.UNDEFINED_FUNCTION.getState().equals(e.getSQLState())) {
                LOGGER.info("Postgres server doesn't support the command pg_replication_slot_advance(). Not seeking to last known offset.");
                return;
            }
            if (e.getMessage().matches("ERROR: must be superuser or replication role to use replication slots(.|\\n)*") || SQL_STATE_INSUFFICIENT_PRIVILEGE.equals(e.getSQLState())) {
                LOGGER.warn("Unable to use pg_replication_slot_advance() function. The Postgres server is likely on an old RDS version or privileges are not correctly set", e);
                return;
            }
            if (e.getMessage().matches("ERROR: cannot advance replication slot to.*") || PSQLState.OBJECT_NOT_IN_STATE.getState().equals(e.getSQLState())) {
                switch (AnonymousClass2.$SwitchMap$io$debezium$config$CommonConnectorConfig$EventProcessingFailureHandlingMode[this.connectorConfig.getEventProcessingFailureHandlingMode().ordinal()]) {
                    case 1:
                    case 2:
                        LOGGER.warn("Cannot seek to the last known offset '{}' on replication slot '{}'. Error from server: '{}'", new Object[]{lastCommitStoredLsn.asString(), this.slotName, e.getMessage(), e});
                        return;
                    case 3:
                    case 4:
                        LOGGER.debug("Cannot seek to the last known offset '{}' on replication slot '{}'. Error from server: '{}'", new Object[]{lastCommitStoredLsn.asString(), this.slotName, e.getMessage(), e});
                        return;
                    default:
                        return;
                }
            }
            switch (AnonymousClass2.$SwitchMap$io$debezium$config$CommonConnectorConfig$EventProcessingFailureHandlingMode[this.connectorConfig.getEventProcessingFailureHandlingMode().ordinal()]) {
                case 1:
                    throw new DebeziumException(e);
                case 2:
                    LOGGER.warn("Unexpected error while trying to seek LSN", e);
                    return;
                case 3:
                case 4:
                    LOGGER.debug("Unexpected error while trying to seek LSN", e);
                    return;
                default:
                    return;
            }
        }
    }

    @Override // io.debezium.connector.postgresql.connection.ReplicationConnection
    public void initConnection() throws SQLException, InterruptedException {
        initPublication();
        if (this.hasInitedSlot) {
            return;
        }
        initReplicationSlot();
    }

    @Override // io.debezium.connector.postgresql.connection.ReplicationConnection
    public Optional<SlotCreationResult> createReplicationSlot() throws SQLException {
        LOGGER.debug("Creating new replication slot '{}' for plugin '{}'", this.slotName, this.plugin);
        boolean haveMinimumServerVersion = pgConnection().haveMinimumServerVersion(ServerVersion.v9_4);
        if (this.dropSlotOnClose && !haveMinimumServerVersion) {
            LOGGER.warn("A slot marked as temporary or with an exported snapshot was created, but not on a supported version of Postgres, ignoring!");
        }
        String str = useTemporarySlot() ? "TEMPORARY" : "";
        initPublication();
        Statement createStatement = pgConnection().createStatement();
        try {
            String format = String.format("CREATE_REPLICATION_SLOT \"%s\" %s LOGICAL %s", this.slotName, str, this.plugin.getPostgresPluginName());
            LOGGER.info("Creating replication slot with command {}", format);
            createStatement.execute(format);
            if (haveMinimumServerVersion) {
                this.slotCreationInfo = parseSlotCreation(createStatement.getResultSet());
            }
            Optional<SlotCreationResult> ofNullable = Optional.ofNullable(this.slotCreationInfo);
            if (createStatement != null) {
                createStatement.close();
            }
            return ofNullable;
        } catch (Throwable th) {
            if (createStatement != null) {
                try {
                    createStatement.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    protected BaseConnection pgConnection() throws SQLException {
        return connection(false);
    }

    private SlotCreationResult parseSlotCreation(ResultSet resultSet) {
        try {
            if (resultSet.next()) {
                return new SlotCreationResult(resultSet.getString("slot_name"), resultSet.getString("consistent_point"), resultSet.getString("snapshot_name"), resultSet.getString("output_plugin"));
            }
            throw new ConnectException("No replication slot found");
        } catch (SQLException e) {
            throw new ConnectException("Unable to parse create_replication_slot response", e);
        }
    }

    private ReplicationStream createReplicationStream(final Lsn lsn, final WalPositionLocator walPositionLocator) throws SQLException, InterruptedException {
        PGReplicationStream startPgReplicationStream;
        try {
            try {
                MessageDecoder messageDecoder = this.messageDecoder;
                Objects.requireNonNull(messageDecoder);
                startPgReplicationStream = startPgReplicationStream(lsn, messageDecoder::defaultOptions);
            } catch (PSQLException e) {
                LOGGER.debug("Could not register for streaming, retrying without optional options", e);
                if (useTemporarySlot()) {
                    initReplicationSlot();
                }
                MessageDecoder messageDecoder2 = this.messageDecoder;
                Objects.requireNonNull(messageDecoder2);
                startPgReplicationStream = startPgReplicationStream(lsn, messageDecoder2::defaultOptions);
            }
            final PGReplicationStream pGReplicationStream = startPgReplicationStream;
            return new ReplicationStream() { // from class: io.debezium.connector.postgresql.connection.PostgresReplicationConnection.1
                private static final int CHECK_WARNINGS_AFTER_COUNT = 100;
                private int warningCheckCounter = CHECK_WARNINGS_AFTER_COUNT;
                private ExecutorService keepAliveExecutor = null;
                private AtomicBoolean keepAliveRunning;
                private final Metronome metronome;
                private volatile Lsn lastReceivedLsn;

                {
                    this.metronome = Metronome.sleeper(PostgresReplicationConnection.this.statusUpdateInterval, Clock.SYSTEM);
                }

                @Override // io.debezium.connector.postgresql.connection.ReplicationStream
                public void read(ReplicationStream.ReplicationMessageProcessor replicationMessageProcessor) throws SQLException, InterruptedException {
                    processWarnings(false);
                    ByteBuffer read = pGReplicationStream.read();
                    Lsn valueOf = Lsn.valueOf(pGReplicationStream.getLastReceiveLSN());
                    PostgresReplicationConnection.LOGGER.trace("Streaming requested from LSN {}, received LSN {}", lsn, valueOf);
                    if (PostgresReplicationConnection.this.messageDecoder.shouldMessageBeSkipped(read, valueOf, lsn, walPositionLocator)) {
                        return;
                    }
                    deserializeMessages(read, replicationMessageProcessor);
                }

                @Override // io.debezium.connector.postgresql.connection.ReplicationStream
                public boolean readPending(ReplicationStream.ReplicationMessageProcessor replicationMessageProcessor) throws SQLException, InterruptedException {
                    processWarnings(false);
                    ByteBuffer readPending = pGReplicationStream.readPending();
                    Lsn valueOf = Lsn.valueOf(pGReplicationStream.getLastReceiveLSN());
                    PostgresReplicationConnection.LOGGER.trace("Streaming requested from LSN {}, received LSN {}", lsn, valueOf);
                    if (readPending == null) {
                        return false;
                    }
                    if (PostgresReplicationConnection.this.messageDecoder.shouldMessageBeSkipped(readPending, valueOf, lsn, walPositionLocator)) {
                        return true;
                    }
                    deserializeMessages(readPending, replicationMessageProcessor);
                    return true;
                }

                private void deserializeMessages(ByteBuffer byteBuffer, ReplicationStream.ReplicationMessageProcessor replicationMessageProcessor) throws SQLException, InterruptedException {
                    this.lastReceivedLsn = Lsn.valueOf(pGReplicationStream.getLastReceiveLSN());
                    PostgresReplicationConnection.LOGGER.trace("Received message at LSN {}", this.lastReceivedLsn);
                    PostgresReplicationConnection.this.messageDecoder.processMessage(byteBuffer, replicationMessageProcessor, PostgresReplicationConnection.this.typeRegistry);
                }

                @Override // io.debezium.connector.postgresql.connection.ReplicationStream, java.lang.AutoCloseable
                public void close() throws SQLException {
                    processWarnings(true);
                    pGReplicationStream.close();
                }

                @Override // io.debezium.connector.postgresql.connection.ReplicationStream
                public void flushLsn(Lsn lsn2) throws SQLException {
                    if (PostgresReplicationConnection.this.connectorConfig.isFlushLsnOnSource()) {
                        doFlushLsn(lsn2);
                    }
                }

                private void doFlushLsn(Lsn lsn2) throws SQLException {
                    pGReplicationStream.setFlushedLSN(lsn2.asLogSequenceNumber());
                    pGReplicationStream.setAppliedLSN(lsn2.asLogSequenceNumber());
                    pGReplicationStream.forceUpdateStatus();
                }

                @Override // io.debezium.connector.postgresql.connection.ReplicationStream
                public Lsn lastReceivedLsn() {
                    return this.lastReceivedLsn;
                }

                @Override // io.debezium.connector.postgresql.connection.ReplicationStream
                public void startKeepAlive(ExecutorService executorService) {
                    if (this.keepAliveExecutor == null) {
                        this.keepAliveExecutor = executorService;
                        this.keepAliveRunning = new AtomicBoolean(true);
                        ExecutorService executorService2 = this.keepAliveExecutor;
                        PGReplicationStream pGReplicationStream2 = pGReplicationStream;
                        executorService2.submit(() -> {
                            while (this.keepAliveRunning.get()) {
                                try {
                                    PostgresReplicationConnection.LOGGER.trace("Forcing status update with replication stream");
                                    pGReplicationStream2.forceUpdateStatus();
                                    this.metronome.pause();
                                } catch (Exception e2) {
                                    throw new RuntimeException("received unexpected exception will perform keep alive", e2);
                                }
                            }
                        });
                    }
                }

                @Override // io.debezium.connector.postgresql.connection.ReplicationStream
                public void stopKeepAlive() {
                    if (this.keepAliveExecutor != null) {
                        this.keepAliveRunning.set(false);
                        this.keepAliveExecutor.shutdownNow();
                        this.keepAliveExecutor = null;
                    }
                }

                private void processWarnings(boolean z) throws SQLException {
                    int i = this.warningCheckCounter - 1;
                    this.warningCheckCounter = i;
                    if (i != 0 && !z) {
                        return;
                    }
                    this.warningCheckCounter = CHECK_WARNINGS_AFTER_COUNT;
                    SQLWarning warnings = PostgresReplicationConnection.this.connection().getWarnings();
                    while (true) {
                        SQLWarning sQLWarning = warnings;
                        if (sQLWarning == null) {
                            PostgresReplicationConnection.this.connection().clearWarnings();
                            return;
                        } else {
                            PostgresReplicationConnection.LOGGER.debug("Server-side message: '{}', state = {}, code = {}", new Object[]{sQLWarning.getMessage(), sQLWarning.getSQLState(), Integer.valueOf(sQLWarning.getErrorCode())});
                            warnings = sQLWarning.getNextWarning();
                        }
                    }
                }

                @Override // io.debezium.connector.postgresql.connection.ReplicationStream
                public Lsn startLsn() {
                    return lsn;
                }
            };
        } catch (PSQLException e2) {
            if (!e2.getMessage().matches("(?s)ERROR: requested WAL segment .* has already been removed.*")) {
                throw e2;
            }
            LOGGER.error("Cannot rewind to last processed WAL position", e2);
            throw new ConnectException("The offset to start reading from has been removed from the database write-ahead log. Create a new snapshot and consider setting of PostgreSQL parameter wal_keep_segments = 0.");
        }
    }

    private PGReplicationStream startPgReplicationStream(Lsn lsn, BiFunction<ChainedLogicalStreamBuilder, Function<Integer, Boolean>, ChainedLogicalStreamBuilder> biFunction) throws SQLException {
        if (!$assertionsDisabled && lsn == null) {
            throw new AssertionError();
        }
        ChainedLogicalStreamBuilder apply = biFunction.apply(pgConnection().getReplicationAPI().replicationStream().logical().withSlotName("\"" + this.slotName + "\"").withStartPosition(lsn.asLogSequenceNumber()).withSlotOptions(this.streamParams), (v1) -> {
            return hasMinimumVersion(v1);
        });
        if (this.statusUpdateInterval != null && this.statusUpdateInterval.toMillis() > 0) {
            apply.withStatusInterval(Math.toIntExact(this.statusUpdateInterval.toMillis()), TimeUnit.MILLISECONDS);
        }
        PGReplicationStream start = apply.start();
        try {
            Thread.sleep(10L);
        } catch (Exception e) {
        }
        start.forceUpdateStatus();
        return start;
    }

    private Boolean hasMinimumVersion(int i) {
        try {
            return Boolean.valueOf(pgConnection().haveMinimumServerVersion(i));
        } catch (SQLException e) {
            throw new DebeziumException(e);
        }
    }

    @Override // java.lang.AutoCloseable
    public synchronized void close() {
        close(true);
    }

    public synchronized void close(boolean z) {
        try {
            LOGGER.debug("Closing message decoder");
            this.messageDecoder.close();
        } catch (Throwable th) {
            LOGGER.error("Unexpected error while closing message decoder", th);
        }
        try {
            LOGGER.debug("Closing replication connection");
            super.close();
        } catch (Throwable th2) {
            LOGGER.error("Unexpected error while closing Postgres connection", th2);
        }
        if (this.dropSlotOnClose && z) {
            try {
                PostgresConnection postgresConnection = new PostgresConnection(this.connectorConfig.getJdbcConfig(), PostgresConnection.CONNECTION_DROP_SLOT);
                try {
                    postgresConnection.dropReplicationSlot(this.slotName);
                    postgresConnection.close();
                } finally {
                }
            } catch (Throwable th3) {
                LOGGER.error("Unexpected error while dropping replication slot", th3);
            }
        }
    }

    @Override // io.debezium.connector.postgresql.connection.ReplicationConnection
    public void reconnect() throws SQLException {
        close(false);
        connection(false);
    }

    static {
        $assertionsDisabled = !PostgresReplicationConnection.class.desiredAssertionStatus();
        LOGGER = LoggerFactory.getLogger(PostgresReplicationConnection.class);
    }
}
