package io.debezium.connector.postgresql;

import io.debezium.DebeziumException;
import io.debezium.connector.postgresql.PostgresPartition;
import io.debezium.connector.postgresql.connection.PostgresConnection;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.notification.NotificationService;
import io.debezium.pipeline.source.snapshot.incremental.AbstractIncrementalSnapshotChangeEventSource;
import io.debezium.pipeline.source.spi.DataChangeEventListener;
import io.debezium.pipeline.source.spi.SnapshotProgressListener;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.relational.RelationalDatabaseConnectorConfig;
import io.debezium.relational.Table;
import io.debezium.relational.TableId;
import io.debezium.schema.DatabaseSchema;
import io.debezium.spi.schema.DataCollectionId;
import io.debezium.util.Clock;
import java.sql.SQLException;
import java.util.Objects;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/debezium/connector/postgresql/PostgresReadOnlyIncrementalSnapshotChangeEventSource.class */
public class PostgresReadOnlyIncrementalSnapshotChangeEventSource<P extends PostgresPartition> extends AbstractIncrementalSnapshotChangeEventSource<P, TableId> {
    private static final Logger LOGGER = LoggerFactory.getLogger(PostgresReadOnlyIncrementalSnapshotChangeEventSource.class);
    private static final String FORCE_NEW_TRANSACTION = "SELECT * FROM pg_current_xact_id();";
    private static final String CURRENT_SNAPSHOT = "SELECT * FROM pg_current_snapshot();";
    private final PostgresConnection jdbcConnection;
    private final PostgresSchema schema;

    public PostgresReadOnlyIncrementalSnapshotChangeEventSource(RelationalDatabaseConnectorConfig relationalDatabaseConnectorConfig, JdbcConnection jdbcConnection, EventDispatcher<P, TableId> eventDispatcher, DatabaseSchema<?> databaseSchema, Clock clock, SnapshotProgressListener<P> snapshotProgressListener, DataChangeEventListener<P> dataChangeEventListener, NotificationService<P, ? extends OffsetContext> notificationService) {
        super(relationalDatabaseConnectorConfig, jdbcConnection, eventDispatcher, databaseSchema, clock, snapshotProgressListener, dataChangeEventListener, notificationService);
        this.jdbcConnection = (PostgresConnection) jdbcConnection;
        this.schema = (PostgresSchema) databaseSchema;
    }

    protected void preIncrementalSnapshotStart() {
        super.preIncrementalSnapshotStart();
        forceNewTransactionId();
    }

    private PostgresReadOnlyIncrementalSnapshotContext<TableId> getContext() {
        return this.context;
    }

    protected void emitWindowOpen() {
        PostgresReadOnlyIncrementalSnapshotContext<TableId> context = getContext();
        Objects.requireNonNull(context);
        getCurrentSnapshot(context::setLowWatermark);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void emitWindowClose(P p, OffsetContext offsetContext) {
        PostgresReadOnlyIncrementalSnapshotContext<TableId> context = getContext();
        Objects.requireNonNull(context);
        getCurrentSnapshot(context::setHighWatermark);
    }

    public void processMessage(P p, DataCollectionId dataCollectionId, Object obj, OffsetContext offsetContext) throws InterruptedException {
        if (getContext() == null) {
            LOGGER.warn("Context is null, skipping message processing");
            return;
        }
        LOGGER.debug("Checking window for table '{}', key '{}', window contains '{}'", new Object[]{dataCollectionId, obj, this.window});
        getContext().updateWindowState(offsetContext);
        boolean isWindowClosed = getContext().isWindowClosed();
        if (getContext().snapshotRunning() && isWindowClosed) {
            sendWindowEvents(p, offsetContext);
            readChunk(p, offsetContext);
        } else {
            if (this.window.isEmpty() || !getContext().deduplicationNeeded()) {
                return;
            }
            LOGGER.trace("Deduplicating");
            deduplicateWindow(dataCollectionId, obj);
        }
    }

    public void processTransactionCommittedEvent(P p, OffsetContext offsetContext) throws InterruptedException {
        if (getContext() == null) {
            LOGGER.warn("Context is null, skipping message processing");
            return;
        }
        LOGGER.trace("Processing transaction event");
        readUntilNewTransactionChange(p, offsetContext);
        LOGGER.trace("Finished processing transaction event");
    }

    public void processHeartbeat(P p, OffsetContext offsetContext) throws InterruptedException {
        if (getContext() == null) {
            LOGGER.warn("Context is null, skipping message processing");
            return;
        }
        LOGGER.trace("Processing heartbeat event");
        readUntilNewTransactionChange(p, offsetContext);
        LOGGER.trace("Finished processing heartbeat event");
    }

    protected Table refreshTableSchema(Table table) throws SQLException {
        LOGGER.debug("Refreshing table '{}' schema for incremental snapshot.", table.id());
        this.schema.refreshFromIncrementalSnapshot(this.jdbcConnection, table.id());
        return this.schema.tableFor(table.id());
    }

    private void readUntilNewTransactionChange(P p, OffsetContext offsetContext) throws InterruptedException {
        Long int64 = offsetContext.getSourceInfo().getInt64(SourceInfo.TXID_KEY);
        LOGGER.debug("Event txId {}, snapshot is running {}, reachedHighWatermark {}", new Object[]{int64, Boolean.valueOf(getContext().snapshotRunning()), Boolean.valueOf(getContext().isTransactionVisible(int64))});
        LOGGER.trace("Incremental snapshot context {}", getContext());
        if (getContext().snapshotRunning() && maxInProgressTransactionCommitted(int64)) {
            getContext().closeWindow();
            sendWindowEvents(p, offsetContext);
            readChunk(p, offsetContext);
            return;
        }
        while (getContext().snapshotRunning() && getContext().isTransactionVisible(int64)) {
            LOGGER.debug("Finishing snapshot, snapshot is running {}, reachedHighWatermark {}", Boolean.valueOf(getContext().snapshotRunning()), Boolean.valueOf(getContext().isTransactionVisible(int64)));
            getContext().closeWindow();
            sendWindowEvents(p, offsetContext);
            readChunk(p, offsetContext);
            if (getContext().watermarksChanged()) {
                LOGGER.trace("Watermarks changed");
                return;
            }
            LOGGER.trace("Re read chunk finished, snapshot is running {}, reachedHighWatermark {}", Boolean.valueOf(getContext().snapshotRunning()), Boolean.valueOf(getContext().isTransactionVisible(int64)));
        }
    }

    private void getCurrentSnapshot(Consumer<PgSnapshot> consumer) {
        try {
            consumer.accept((PgSnapshot) this.jdbcConnection.queryAndMap(CURRENT_SNAPSHOT, this.jdbcConnection.singleResultMapper(resultSet -> {
                String string = resultSet.getString(1);
                LOGGER.trace("Current snapshot {}", string);
                return PgSnapshot.valueOf(string);
            }, "Unable to get current snapshot")));
        } catch (SQLException e) {
            throw new DebeziumException(e);
        }
    }

    private boolean maxInProgressTransactionCommitted(Long l) {
        if (getContext().getHighWatermark() == null) {
            return false;
        }
        return getContext().getHighWatermark().getXMax().equals(l);
    }

    private void forceNewTransactionId() {
        try {
            this.jdbcConnection.query(FORCE_NEW_TRANSACTION, resultSet -> {
                if (resultSet.next()) {
                    LOGGER.trace("Created new transaction ID {}", resultSet.getString(1));
                }
            });
        } catch (SQLException e) {
            throw new DebeziumException(e);
        }
    }
}
