package io.debezium.connector.db2;

import io.debezium.pipeline.ErrorHandler;
import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.source.spi.ChangeEventSource;
import io.debezium.pipeline.source.spi.ChangeTableResultSet;
import io.debezium.pipeline.source.spi.StreamingChangeEventSource;
import io.debezium.relational.TableId;
import io.debezium.schema.SchemaChangeEvent;
import io.debezium.util.Clock;
import io.debezium.util.Metronome;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.PriorityQueue;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/debezium/connector/db2/Db2StreamingChangeEventSource.class */
public class Db2StreamingChangeEventSource implements StreamingChangeEventSource<Db2Partition, Db2OffsetContext> {
    private static final int COL_COMMIT_LSN = 2;
    private static final int COL_ROW_LSN = 3;
    private static final int COL_OPERATION = 1;
    private static final int COL_DATA = 5;
    private static final Pattern MISSING_CDC_FUNCTION_CHANGES_ERROR = Pattern.compile("Invalid object name 'cdc.fn_cdc_get_all_changes_(.*)'\\.");
    private static final Logger LOGGER = LoggerFactory.getLogger(Db2StreamingChangeEventSource.class);
    private final Db2Connection dataConnection;
    private final Db2Connection metadataConnection;
    private final EventDispatcher<TableId> dispatcher;
    private final ErrorHandler errorHandler;
    private final Clock clock;
    private final Db2DatabaseSchema schema;
    private final Duration pollInterval;
    private final Db2ConnectorConfig connectorConfig;

    /* loaded from: input_file:io/debezium/connector/db2/Db2StreamingChangeEventSource$ChangeTablePointer.class */
    private static class ChangeTablePointer extends ChangeTableResultSet<Db2ChangeTable, TxLogPosition> {
        public ChangeTablePointer(Db2ChangeTable db2ChangeTable, ResultSet resultSet) {
            super(db2ChangeTable, resultSet, Db2StreamingChangeEventSource.COL_DATA);
        }

        protected int getOperation(ResultSet resultSet) throws SQLException {
            return resultSet.getInt(1);
        }

        /* JADX INFO: Access modifiers changed from: protected */
        /* renamed from: getNextChangePosition, reason: merged with bridge method [inline-methods] */
        public TxLogPosition m11getNextChangePosition(ResultSet resultSet) throws SQLException {
            return isCompleted() ? TxLogPosition.NULL : TxLogPosition.valueOf(Lsn.valueOf(resultSet.getBytes(2)), Lsn.valueOf(resultSet.getBytes(3)));
        }
    }

    public Db2StreamingChangeEventSource(Db2ConnectorConfig db2ConnectorConfig, Db2Connection db2Connection, Db2Connection db2Connection2, EventDispatcher<TableId> eventDispatcher, ErrorHandler errorHandler, Clock clock, Db2DatabaseSchema db2DatabaseSchema) {
        this.connectorConfig = db2ConnectorConfig;
        this.dataConnection = db2Connection;
        this.metadataConnection = db2Connection2;
        this.dispatcher = eventDispatcher;
        this.errorHandler = errorHandler;
        this.clock = clock;
        this.schema = db2DatabaseSchema;
        this.pollInterval = db2ConnectorConfig.getPollInterval();
    }

    public void execute(ChangeEventSource.ChangeEventSourceContext changeEventSourceContext, Db2Partition db2Partition, Db2OffsetContext db2OffsetContext) throws InterruptedException {
        Metronome sleeper = Metronome.sleeper(this.pollInterval, this.clock);
        PriorityQueue priorityQueue = new PriorityQueue((db2ChangeTable, db2ChangeTable2) -> {
            return db2ChangeTable.getStopLsn().compareTo(db2ChangeTable2.getStopLsn());
        });
        try {
            AtomicReference atomicReference = new AtomicReference(getCdcTablesToQuery(db2Partition, db2OffsetContext));
            TxLogPosition changePosition = db2OffsetContext.getChangePosition();
            long eventSerialNo = db2OffsetContext.getEventSerialNo();
            LOGGER.info("Last position recorded in offsets is {}[{}]", changePosition, Long.valueOf(eventSerialNo));
            TxLogPosition txLogPosition = changePosition;
            boolean isSnapshotCompleted = db2OffsetContext.isSnapshotCompleted();
            while (changeEventSourceContext.isRunning()) {
                Lsn maxLsn = this.dataConnection.getMaxLsn();
                if (!maxLsn.isAvailable()) {
                    LOGGER.warn("No maximum LSN recorded in the database; please ensure that the DB2 Agent is running");
                    sleeper.pause();
                } else if (maxLsn.equals(txLogPosition.getCommitLsn()) && isSnapshotCompleted) {
                    LOGGER.debug("No change in the database");
                    sleeper.pause();
                } else {
                    Lsn incrementLsn = (txLogPosition.getCommitLsn().isAvailable() && isSnapshotCompleted) ? this.dataConnection.incrementLsn(txLogPosition.getCommitLsn()) : txLogPosition.getCommitLsn();
                    isSnapshotCompleted = true;
                    while (!priorityQueue.isEmpty()) {
                        migrateTable(db2Partition, db2OffsetContext, priorityQueue);
                    }
                    if (!this.dataConnection.listOfNewChangeTables(incrementLsn, maxLsn).isEmpty()) {
                        Db2ChangeTable[] cdcTablesToQuery = getCdcTablesToQuery(db2Partition, db2OffsetContext);
                        atomicReference.set(cdcTablesToQuery);
                        for (Db2ChangeTable db2ChangeTable3 : cdcTablesToQuery) {
                            if (db2ChangeTable3.getStartLsn().isBetween(incrementLsn, maxLsn)) {
                                LOGGER.info("Schema will be changed for {}", db2ChangeTable3);
                                priorityQueue.add(db2ChangeTable3);
                            }
                        }
                    }
                    try {
                        this.dataConnection.getChangesForTables((Db2ChangeTable[]) atomicReference.get(), incrementLsn, maxLsn, resultSetArr -> {
                            ChangeTablePointer changeTablePointer;
                            TableId sourceTableId;
                            long j = 1;
                            int length = resultSetArr.length;
                            ChangeTablePointer[] changeTablePointerArr = new ChangeTablePointer[length];
                            Db2ChangeTable[] db2ChangeTableArr = (Db2ChangeTable[]) atomicReference.get();
                            for (int i = 0; i < length; i++) {
                                changeTablePointerArr[i] = new ChangeTablePointer(db2ChangeTableArr[i], resultSetArr[i]);
                                changeTablePointerArr[i].next();
                            }
                            while (true) {
                                changeTablePointer = null;
                                for (ChangeTablePointer changeTablePointer2 : changeTablePointerArr) {
                                    if (!changeTablePointer2.isCompleted() && (changeTablePointer == null || changeTablePointer2.compareTo(changeTablePointer) < 0)) {
                                        changeTablePointer = changeTablePointer2;
                                    }
                                }
                                if (changeTablePointer == null) {
                                    return;
                                }
                                if (!((TxLogPosition) changeTablePointer.getChangePosition()).isAvailable() || !((TxLogPosition) changeTablePointer.getChangePosition()).getInTxLsn().isAvailable()) {
                                    LOGGER.error("Skipping change {} as its LSN is NULL which is not expected", changeTablePointer);
                                    changeTablePointer.next();
                                } else if (((TxLogPosition) changeTablePointer.getChangePosition()).compareTo(changePosition) < 0) {
                                    LOGGER.info("Skipping change {} as its position is smaller than the last recorded position {}", changeTablePointer, changePosition);
                                    changeTablePointer.next();
                                } else if (((TxLogPosition) changeTablePointer.getChangePosition()).compareTo(changePosition) == 0 && j <= eventSerialNo) {
                                    LOGGER.info("Skipping change {} as its order in the transaction {} is smaller than or equal to the last recorded operation {}[{}]", new Object[]{changeTablePointer, Long.valueOf(j), changePosition, Long.valueOf(eventSerialNo)});
                                    j++;
                                    changeTablePointer.next();
                                } else if (!((Db2ChangeTable) changeTablePointer.getChangeTable()).getStopLsn().isAvailable() || ((Db2ChangeTable) changeTablePointer.getChangeTable()).getStopLsn().compareTo(((TxLogPosition) changeTablePointer.getChangePosition()).getCommitLsn()) > 0) {
                                    LOGGER.trace("Processing change {}", changeTablePointer);
                                    if (!priorityQueue.isEmpty() && ((TxLogPosition) changeTablePointer.getChangePosition()).getCommitLsn().compareTo(((Db2ChangeTable) priorityQueue.peek()).getStopLsn()) >= 0) {
                                        migrateTable(db2Partition, db2OffsetContext, priorityQueue);
                                    }
                                    sourceTableId = ((Db2ChangeTable) changeTablePointer.getChangeTable()).getSourceTableId();
                                    int operation = changeTablePointer.getOperation();
                                    Object[] data = changeTablePointer.getData();
                                    int i2 = 1;
                                    if (operation == 3) {
                                        if (!changeTablePointer.next() || changeTablePointer.getOperation() != 4) {
                                            break;
                                        } else {
                                            i2 = 2;
                                        }
                                    }
                                    Object[] data2 = operation == 3 ? changeTablePointer.getData() : null;
                                    db2OffsetContext.setChangePosition((TxLogPosition) changeTablePointer.getChangePosition(), i2);
                                    db2OffsetContext.event(((Db2ChangeTable) changeTablePointer.getChangeTable()).getSourceTableId(), this.metadataConnection.timestampOfLsn(((TxLogPosition) changeTablePointer.getChangePosition()).getCommitLsn()));
                                    this.dispatcher.dispatchDataChangeEvent(sourceTableId, new Db2ChangeRecordEmitter(db2Partition, db2OffsetContext, operation, data, data2, this.clock));
                                    changeTablePointer.next();
                                } else {
                                    LOGGER.debug("Skipping table change {} as its stop LSN is smaller than the last recorded LSN {}", changeTablePointer, changeTablePointer.getChangePosition());
                                    changeTablePointer.next();
                                }
                            }
                            throw new IllegalStateException("The update before event at " + changeTablePointer.getChangePosition() + " for table " + sourceTableId + " was not followed by after event.\n Please report this as a bug together with a events around given LSN.");
                        });
                        txLogPosition = TxLogPosition.valueOf(maxLsn);
                        this.dataConnection.rollback();
                    } catch (SQLException e) {
                        atomicReference.set(processErrorFromChangeTableQuery(e, (Db2ChangeTable[]) atomicReference.get()));
                    }
                }
            }
        } catch (Exception e2) {
            this.errorHandler.setProducerThrowable(e2);
        }
    }

    private void migrateTable(Db2Partition db2Partition, Db2OffsetContext db2OffsetContext, Queue<Db2ChangeTable> queue) throws InterruptedException, SQLException {
        Db2ChangeTable poll = queue.poll();
        LOGGER.info("Migrating schema to {}", poll);
        this.dispatcher.dispatchSchemaChangeEvent(poll.getSourceTableId(), new Db2SchemaChangeEventEmitter(db2Partition, db2OffsetContext, poll, this.metadataConnection.getTableSchemaFromTable(poll), SchemaChangeEvent.SchemaChangeEventType.ALTER));
    }

    private Db2ChangeTable[] processErrorFromChangeTableQuery(SQLException sQLException, Db2ChangeTable[] db2ChangeTableArr) throws Exception {
        Matcher matcher = MISSING_CDC_FUNCTION_CHANGES_ERROR.matcher(sQLException.getMessage());
        if (!matcher.matches()) {
            throw sQLException;
        }
        String group = matcher.group(1);
        LOGGER.info("Table is no longer captured with capture instance {}", group);
        return (Db2ChangeTable[]) ((List) Arrays.asList(db2ChangeTableArr).stream().filter(db2ChangeTable -> {
            return !db2ChangeTable.getCaptureInstance().equals(group);
        }).collect(Collectors.toList())).toArray(new Db2ChangeTable[0]);
    }

    private Db2ChangeTable[] getCdcTablesToQuery(Db2Partition db2Partition, Db2OffsetContext db2OffsetContext) throws SQLException, InterruptedException {
        Db2ChangeTable db2ChangeTable;
        Set<Db2ChangeTable> listOfChangeTables = this.dataConnection.listOfChangeTables();
        if (listOfChangeTables.isEmpty()) {
            LOGGER.warn("No table has enabled CDC or security constraints prevents getting the list of change tables");
        }
        Map map = (Map) listOfChangeTables.stream().filter(db2ChangeTable2 -> {
            if (this.connectorConfig.getTableFilters().dataCollectionFilter().isIncluded(db2ChangeTable2.getSourceTableId())) {
                return true;
            }
            LOGGER.info("CDC is enabled for table {} but the table is not included by connector", db2ChangeTable2);
            return false;
        }).collect(Collectors.groupingBy(db2ChangeTable3 -> {
            return db2ChangeTable3.getSourceTableId();
        }));
        if (map.isEmpty()) {
            LOGGER.warn("After applying the include/exclude list filters, no changes will be captured. Please check your configuration!");
        }
        ArrayList arrayList = new ArrayList();
        for (List list : map.values()) {
            Db2ChangeTable db2ChangeTable4 = (Db2ChangeTable) list.get(0);
            if (list.size() > 1) {
                if (((Db2ChangeTable) list.get(0)).getStartLsn().compareTo(((Db2ChangeTable) list.get(1)).getStartLsn()) < 0) {
                    db2ChangeTable = (Db2ChangeTable) list.get(1);
                } else {
                    db2ChangeTable4 = (Db2ChangeTable) list.get(1);
                    db2ChangeTable = (Db2ChangeTable) list.get(0);
                }
                db2ChangeTable4.setStopLsn(db2ChangeTable.getStartLsn());
                arrayList.add(db2ChangeTable);
                LOGGER.info("Multiple capture instances present for the same table: {} and {}", db2ChangeTable4, db2ChangeTable);
            }
            if (this.schema.tableFor(db2ChangeTable4.getSourceTableId()) == null) {
                LOGGER.info("Table {} is new to be monitored by capture instance {}", db2ChangeTable4.getSourceTableId(), db2ChangeTable4.getCaptureInstance());
                this.dispatcher.dispatchSchemaChangeEvent(db2ChangeTable4.getSourceTableId(), new Db2SchemaChangeEventEmitter(db2Partition, db2OffsetContext, db2ChangeTable4, this.dataConnection.getTableSchemaFromTable(db2ChangeTable4), SchemaChangeEvent.SchemaChangeEventType.CREATE));
            }
            arrayList.add(db2ChangeTable4);
        }
        return (Db2ChangeTable[]) arrayList.toArray(new Db2ChangeTable[arrayList.size()]);
    }
}
