package io.debezium.connector.mysql;

import io.debezium.DebeziumException;
import io.debezium.connector.mysql.signal.ExecuteSnapshotKafkaSignal;
import io.debezium.connector.mysql.signal.KafkaSignalThread;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.source.snapshot.incremental.AbstractIncrementalSnapshotChangeEventSource;
import io.debezium.pipeline.source.spi.DataChangeEventListener;
import io.debezium.pipeline.source.spi.SnapshotProgressListener;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.pipeline.spi.Partition;
import io.debezium.relational.RelationalDatabaseConnectorConfig;
import io.debezium.schema.DataCollectionId;
import io.debezium.schema.DatabaseSchema;
import io.debezium.util.Clock;
import java.sql.SQLException;
import java.util.List;
import java.util.Objects;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/debezium/connector/mysql/MySqlReadOnlyIncrementalSnapshotChangeEventSource.class */
public class MySqlReadOnlyIncrementalSnapshotChangeEventSource<T extends DataCollectionId> extends AbstractIncrementalSnapshotChangeEventSource<T> {
    private static final Logger LOGGER = LoggerFactory.getLogger(MySqlReadOnlyIncrementalSnapshotChangeEventSource.class);
    private final String showMasterStmt = "SHOW MASTER STATUS";
    private final KafkaSignalThread<T> kafkaSignal;

    public MySqlReadOnlyIncrementalSnapshotChangeEventSource(RelationalDatabaseConnectorConfig relationalDatabaseConnectorConfig, JdbcConnection jdbcConnection, EventDispatcher<T> eventDispatcher, DatabaseSchema<?> databaseSchema, Clock clock, SnapshotProgressListener snapshotProgressListener, DataChangeEventListener dataChangeEventListener) {
        super(relationalDatabaseConnectorConfig, jdbcConnection, eventDispatcher, databaseSchema, clock, snapshotProgressListener, dataChangeEventListener);
        this.showMasterStmt = "SHOW MASTER STATUS";
        this.kafkaSignal = new KafkaSignalThread<>(MySqlConnector.class, relationalDatabaseConnectorConfig, this);
    }

    public void init(OffsetContext offsetContext) {
        super.init(offsetContext);
        Long signalOffset = getContext().getSignalOffset();
        if (signalOffset != null) {
            this.kafkaSignal.seek(signalOffset.longValue());
        }
        this.kafkaSignal.start();
    }

    public void processMessage(Partition partition, DataCollectionId dataCollectionId, Object obj, OffsetContext offsetContext) throws InterruptedException {
        if (getContext() == null) {
            LOGGER.warn("Context is null, skipping message processing");
            return;
        }
        checkEnqueuedSnapshotSignals(offsetContext);
        LOGGER.trace("Checking window for table '{}', key '{}', window contains '{}'", new Object[]{dataCollectionId, obj, this.window});
        if (getContext().updateWindowState(offsetContext)) {
            sendWindowEvents(partition, offsetContext);
            readChunk();
        } else {
            if (this.window.isEmpty() || !getContext().deduplicationNeeded()) {
                return;
            }
            deduplicateWindow(dataCollectionId, obj);
        }
    }

    public void processHeartbeat(Partition partition, OffsetContext offsetContext) throws InterruptedException {
        if (getContext() == null) {
            LOGGER.warn("Context is null, skipping message processing");
            return;
        }
        checkEnqueuedSnapshotSignals(offsetContext);
        while (getContext().snapshotRunning() && getContext().reachedHighWatermark(offsetContext)) {
            sendWindowEvents(partition, offsetContext);
            readChunk();
        }
    }

    public void processFilteredEvent(Partition partition, OffsetContext offsetContext) throws InterruptedException {
        if (getContext() == null) {
            LOGGER.warn("Context is null, skipping message processing");
            return;
        }
        checkEnqueuedSnapshotSignals(offsetContext);
        if (getContext().updateWindowState(offsetContext)) {
            sendWindowEvents(partition, offsetContext);
            readChunk();
        }
    }

    public void enqueueDataCollectionNamesToSnapshot(List<String> list, long j) {
        getContext().enqueueDataCollectionsToSnapshot(list, j);
    }

    public void processTransactionStartedEvent(Partition partition, OffsetContext offsetContext) throws InterruptedException {
        if (getContext() == null) {
            LOGGER.warn("Context is null, skipping message processing");
        } else if (getContext().updateWindowState(offsetContext)) {
            sendWindowEvents(partition, offsetContext);
            readChunk();
        }
    }

    public void processTransactionCommittedEvent(Partition partition, OffsetContext offsetContext) throws InterruptedException {
        if (getContext() == null) {
            LOGGER.warn("Context is null, skipping message processing");
            return;
        }
        while (getContext().snapshotRunning() && getContext().reachedHighWatermark(offsetContext)) {
            sendWindowEvents(partition, offsetContext);
            readChunk();
        }
    }

    protected void updateLowWatermark() {
        try {
            MySqlReadOnlyIncrementalSnapshotContext<T> context = getContext();
            Objects.requireNonNull(context);
            getExecutedGtidSet(context::setLowWatermark);
            this.jdbcConnection.commit();
        } catch (SQLException e) {
            throw new DebeziumException(e);
        }
    }

    protected void updateHighWatermark() {
        MySqlReadOnlyIncrementalSnapshotContext<T> context = getContext();
        Objects.requireNonNull(context);
        getExecutedGtidSet(context::setHighWatermark);
    }

    private void getExecutedGtidSet(Consumer<GtidSet> consumer) {
        try {
            this.jdbcConnection.query("SHOW MASTER STATUS", resultSet -> {
                if (resultSet.next()) {
                    if (resultSet.getMetaData().getColumnCount() <= 4) {
                        throw new UnsupportedOperationException("Need to add support for executed GTIDs for versions prior to 5.6.5");
                    }
                    consumer.accept(new GtidSet(resultSet.getString(5)));
                }
            });
        } catch (SQLException e) {
            throw new DebeziumException(e);
        }
    }

    protected void emitWindowOpen() {
        updateLowWatermark();
    }

    protected void emitWindowClose() throws InterruptedException {
        updateHighWatermark();
        if (getContext().serverUuidChanged()) {
            rereadChunk();
        }
    }

    public void rereadChunk() throws InterruptedException {
        if (this.context != null && this.context.snapshotRunning() && this.context.deduplicationNeeded() && !this.window.isEmpty()) {
            this.window.clear();
            this.context.revertChunk();
            readChunk();
        }
    }

    private void checkEnqueuedSnapshotSignals(OffsetContext offsetContext) throws InterruptedException {
        while (getContext().hasExecuteSnapshotSignals()) {
            addDataCollectionNamesToSnapshot(getContext().getExecuteSnapshotSignals(), offsetContext);
        }
    }

    private void addDataCollectionNamesToSnapshot(ExecuteSnapshotKafkaSignal executeSnapshotKafkaSignal, OffsetContext offsetContext) throws InterruptedException {
        super.addDataCollectionNamesToSnapshot(executeSnapshotKafkaSignal.getDataCollections(), offsetContext);
        getContext().setSignalOffset(Long.valueOf(executeSnapshotKafkaSignal.getSignalOffset()));
    }

    private MySqlReadOnlyIncrementalSnapshotContext<T> getContext() {
        return this.context;
    }
}
