package io.debezium.connector.oracle.logminer.processor;

import io.debezium.DebeziumException;
import io.debezium.connector.oracle.OracleConnection;
import io.debezium.connector.oracle.OracleConnectorConfig;
import io.debezium.connector.oracle.OracleDatabaseSchema;
import io.debezium.connector.oracle.OracleOffsetContext;
import io.debezium.connector.oracle.OraclePartition;
import io.debezium.connector.oracle.OracleSchemaChangeEventEmitter;
import io.debezium.connector.oracle.OracleStreamingChangeEventSourceMetrics;
import io.debezium.connector.oracle.Scn;
import io.debezium.connector.oracle.logminer.events.DmlEvent;
import io.debezium.connector.oracle.logminer.events.EventType;
import io.debezium.connector.oracle.logminer.events.LobEraseEvent;
import io.debezium.connector.oracle.logminer.events.LobWriteEvent;
import io.debezium.connector.oracle.logminer.events.LogMinerEvent;
import io.debezium.connector.oracle.logminer.events.LogMinerEventRow;
import io.debezium.connector.oracle.logminer.events.SelectLobLocatorEvent;
import io.debezium.connector.oracle.logminer.events.Transaction;
import io.debezium.connector.oracle.logminer.parser.DmlParserException;
import io.debezium.connector.oracle.logminer.parser.LogMinerDmlEntry;
import io.debezium.connector.oracle.logminer.parser.LogMinerDmlParser;
import io.debezium.connector.oracle.logminer.parser.SelectLobParser;
import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.source.spi.ChangeEventSource;
import io.debezium.relational.Table;
import io.debezium.relational.TableId;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.time.Duration;
import java.time.Instant;
import java.util.function.Supplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/debezium/connector/oracle/logminer/processor/AbstractLogMinerEventProcessor.class */
public abstract class AbstractLogMinerEventProcessor implements LogMinerEventProcessor {
    private static final Logger LOGGER = LoggerFactory.getLogger(AbstractLogMinerEventProcessor.class);
    private final ChangeEventSource.ChangeEventSourceContext context;
    private final OracleConnectorConfig connectorConfig;
    private final OracleDatabaseSchema schema;
    private final OraclePartition partition;
    private final OracleOffsetContext offsetContext;
    private final EventDispatcher<TableId> dispatcher;
    private final OracleStreamingChangeEventSourceMetrics metrics;
    private final TransactionReconciliation reconciliation;
    protected final Counters counters = new Counters();
    private final LogMinerDmlParser dmlParser = new LogMinerDmlParser();
    private final SelectLobParser selectLobParser = new SelectLobParser();

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:io/debezium/connector/oracle/logminer/processor/AbstractLogMinerEventProcessor$Counters.class */
    public class Counters {
        public int stuckCount;
        public int dmlCount;
        public int ddlCount;
        public int insertCount;
        public int updateCount;
        public int deleteCount;
        public int commitCount;
        public int rollbackCount;
        public int tableMetadataCount;
        public long rows;

        protected Counters() {
        }

        public void reset() {
            this.stuckCount = 0;
            this.dmlCount = 0;
            this.ddlCount = 0;
            this.insertCount = 0;
            this.updateCount = 0;
            this.deleteCount = 0;
            this.commitCount = 0;
            this.rollbackCount = 0;
            this.tableMetadataCount = 0;
            this.rows = 0L;
        }

        public String toString() {
            return "Counters{rows=" + this.rows + ", stuckCount=" + this.stuckCount + ", dmlCount=" + this.dmlCount + ", ddlCount=" + this.ddlCount + ", insertCount=" + this.insertCount + ", updateCount=" + this.updateCount + ", deleteCount=" + this.deleteCount + ", commitCount=" + this.commitCount + ", rollbackCount=" + this.rollbackCount + ", tableMetadataCount=" + this.tableMetadataCount + '}';
        }
    }

    public AbstractLogMinerEventProcessor(ChangeEventSource.ChangeEventSourceContext changeEventSourceContext, OracleConnectorConfig oracleConnectorConfig, OracleDatabaseSchema oracleDatabaseSchema, OraclePartition oraclePartition, OracleOffsetContext oracleOffsetContext, EventDispatcher<TableId> eventDispatcher, OracleStreamingChangeEventSourceMetrics oracleStreamingChangeEventSourceMetrics) {
        this.context = changeEventSourceContext;
        this.connectorConfig = oracleConnectorConfig;
        this.schema = oracleDatabaseSchema;
        this.partition = oraclePartition;
        this.offsetContext = oracleOffsetContext;
        this.dispatcher = eventDispatcher;
        this.metrics = oracleStreamingChangeEventSourceMetrics;
        this.reconciliation = new TransactionReconciliation(oracleConnectorConfig, oracleDatabaseSchema);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public OracleConnectorConfig getConfig() {
        return this.connectorConfig;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public OracleDatabaseSchema getSchema() {
        return this.schema;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public TransactionReconciliation getReconciliation() {
        return this.reconciliation;
    }

    protected boolean isRecentlyCommitted(String str) {
        return false;
    }

    protected boolean hasSchemaChangeBeenSeen(LogMinerEventRow logMinerEventRow) {
        return false;
    }

    protected boolean isTransactionIdAllowed(String str) {
        return true;
    }

    protected abstract TransactionCache<?> getTransactionCache();

    protected boolean isTrxIdRawValue() {
        return true;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void processResults(ResultSet resultSet) throws SQLException, InterruptedException {
        while (this.context.isRunning() && hasNextWithMetricsUpdate(resultSet)) {
            this.counters.rows++;
            processRow(LogMinerEventRow.fromResultSet(resultSet, getConfig().getCatalogName(), isTrxIdRawValue()));
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void processRow(LogMinerEventRow logMinerEventRow) throws SQLException, InterruptedException {
        switch (logMinerEventRow.getEventType()) {
            case MISSING_SCN:
                handleMissingScn(logMinerEventRow);
                break;
            case START:
                break;
            case COMMIT:
                handleCommit(logMinerEventRow);
                return;
            case ROLLBACK:
                handleRollback(logMinerEventRow);
                return;
            case DDL:
                handleSchemaChange(logMinerEventRow);
                return;
            case SELECT_LOB_LOCATOR:
                handleSelectLobLocator(logMinerEventRow);
                return;
            case LOB_WRITE:
                handleLobWrite(logMinerEventRow);
                return;
            case LOB_ERASE:
                handleLobErase(logMinerEventRow);
                return;
            case INSERT:
            case UPDATE:
            case DELETE:
                handleDataEvent(logMinerEventRow);
                return;
            default:
                return;
        }
        handleStart(logMinerEventRow);
    }

    protected void handleMissingScn(LogMinerEventRow logMinerEventRow) {
        LOGGER.warn("Missing SCN detected. {}", logMinerEventRow);
    }

    protected void handleStart(LogMinerEventRow logMinerEventRow) {
        String transactionId = logMinerEventRow.getTransactionId();
        if (getTransactionCache().get(transactionId) != null || isRecentlyCommitted(transactionId)) {
            return;
        }
        getTransactionCache().put(transactionId, new Transaction(transactionId, logMinerEventRow.getScn(), logMinerEventRow.getChangeTime()));
        this.metrics.setActiveTransactions(getTransactionCache().size());
    }

    protected abstract void handleCommit(LogMinerEventRow logMinerEventRow) throws InterruptedException;

    protected void handleRollback(LogMinerEventRow logMinerEventRow) {
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void handleSchemaChange(LogMinerEventRow logMinerEventRow) throws InterruptedException {
        if (hasSchemaChangeBeenSeen(logMinerEventRow)) {
            LOGGER.trace("DDL: Scn {}, SQL '{}' has already been processed, skipped.", logMinerEventRow.getScn(), logMinerEventRow.getRedoSql());
            return;
        }
        LOGGER.trace("DDL: '{}' {}", logMinerEventRow.getRedoSql(), logMinerEventRow);
        if (logMinerEventRow.getTableName() != null) {
            this.counters.ddlCount++;
            TableId tableId = logMinerEventRow.getTableId();
            this.dispatcher.dispatchSchemaChangeEvent(tableId, new OracleSchemaChangeEventEmitter(getConfig(), this.partition, this.offsetContext, tableId, tableId.catalog(), tableId.schema(), logMinerEventRow.getRedoSql(), getSchema(), logMinerEventRow.getChangeTime(), this.metrics));
        }
    }

    protected void handleSelectLobLocator(LogMinerEventRow logMinerEventRow) {
        if (!getConfig().isLobEnabled()) {
            LOGGER.trace("LOB support is disabled, SEL_LOB_LOCATOR '{}' skipped.", logMinerEventRow.getRedoSql());
            return;
        }
        LOGGER.trace("SEL_LOB_LOCATOR: {}", logMinerEventRow);
        TableId tableId = logMinerEventRow.getTableId();
        Table tableFor = getSchema().tableFor(tableId);
        if (tableFor == null) {
            LOGGER.warn("SEL_LOB_LOCATOR for table '{}' is not known, skipped.", tableId);
            return;
        }
        LogMinerDmlEntry parse = this.selectLobParser.parse(logMinerEventRow.getRedoSql(), tableFor);
        parse.setObjectName(logMinerEventRow.getTableName());
        parse.setObjectOwner(logMinerEventRow.getTablespaceName());
        addToTransaction(logMinerEventRow.getTransactionId(), logMinerEventRow, () -> {
            return new SelectLobLocatorEvent(logMinerEventRow, parse, this.selectLobParser.getColumnName(), this.selectLobParser.isBinary());
        });
        this.metrics.incrementRegisteredDmlCount();
    }

    protected void handleLobWrite(LogMinerEventRow logMinerEventRow) {
        if (!getConfig().isLobEnabled()) {
            LOGGER.trace("LOB support is disabled, LOB_WRITE '{}' skipped", logMinerEventRow);
            return;
        }
        LOGGER.trace("LOB_WRITE: {}", logMinerEventRow);
        TableId tableId = logMinerEventRow.getTableId();
        if (getSchema().tableFor(tableId) == null) {
            LOGGER.warn("LOB_WRITE for table '{}' is not known, skipped", tableId);
        } else if (logMinerEventRow.getRedoSql() != null) {
            String parseLobWriteSql = parseLobWriteSql(logMinerEventRow.getRedoSql());
            addToTransaction(logMinerEventRow.getTransactionId(), logMinerEventRow, () -> {
                return new LobWriteEvent(logMinerEventRow, parseLobWriteSql);
            });
        }
    }

    private void handleLobErase(LogMinerEventRow logMinerEventRow) {
        if (!getConfig().isLobEnabled()) {
            LOGGER.trace("LOB support is disabled, LOB_ERASE '{}' skipped", logMinerEventRow);
            return;
        }
        LOGGER.trace("LOB_ERASE: {}", logMinerEventRow);
        TableId tableId = logMinerEventRow.getTableId();
        if (getSchema().tableFor(tableId) == null) {
            LOGGER.warn("LOB_ERASE for table '{}' is not known, skipped", tableId);
        } else {
            addToTransaction(logMinerEventRow.getTransactionId(), logMinerEventRow, () -> {
                return new LobEraseEvent(logMinerEventRow);
            });
        }
    }

    protected void handleDataEvent(LogMinerEventRow logMinerEventRow) throws SQLException, InterruptedException {
        if (logMinerEventRow.getRedoSql() == null) {
            return;
        }
        LOGGER.trace("DML: {}", logMinerEventRow);
        LOGGER.trace("\t{}", logMinerEventRow.getRedoSql());
        this.counters.dmlCount++;
        switch (logMinerEventRow.getEventType()) {
            case INSERT:
                this.counters.insertCount++;
                break;
            case UPDATE:
                this.counters.updateCount++;
                break;
            case DELETE:
                this.counters.deleteCount++;
                break;
        }
        TableId tableId = logMinerEventRow.getTableId();
        Table tableFor = getSchema().tableFor(tableId);
        if (tableFor == null) {
            if (!getConfig().getTableFilters().dataCollectionFilter().isIncluded(tableId)) {
                return;
            } else {
                tableFor = dispatchSchemaChangeEventAndGetTableForNewCapturedTable(tableId, this.offsetContext, this.dispatcher);
            }
        }
        if (logMinerEventRow.isRollbackFlag()) {
            Transaction transaction = getTransactionCache().get(logMinerEventRow.getTransactionId());
            if (transaction == null) {
                LOGGER.warn("Cannot undo change '{}' since transaction was not found.", logMinerEventRow);
                return;
            } else {
                transaction.removeEventWithRowId(logMinerEventRow.getRowId());
                return;
            }
        }
        LogMinerDmlEntry parseDmlStatement = parseDmlStatement(logMinerEventRow.getRedoSql(), tableFor, logMinerEventRow.getTransactionId());
        parseDmlStatement.setObjectName(logMinerEventRow.getTableName());
        parseDmlStatement.setObjectOwner(logMinerEventRow.getTablespaceName());
        addToTransaction(logMinerEventRow.getTransactionId(), logMinerEventRow, () -> {
            return new DmlEvent(logMinerEventRow, parseDmlStatement);
        });
        this.metrics.incrementRegisteredDmlCount();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void warnPotentiallyStuckScn(Scn scn, Scn scn2) {
        if (this.offsetContext == null || this.offsetContext.getCommitScn() == null) {
            return;
        }
        Scn scn3 = this.offsetContext.getScn();
        Scn commitScn = this.offsetContext.getCommitScn();
        if (!scn.equals(scn3) || scn2.equals(commitScn)) {
            this.counters.stuckCount = 0;
            return;
        }
        this.counters.stuckCount++;
        if (this.counters.stuckCount == 25) {
            LOGGER.warn("Offset SCN {} has not changed in 25 mining session iterations. This indicates long running transaction(s) are active.  Commit SCN {}.", scn, scn2);
            this.metrics.incrementScnFreezeCount();
        }
    }

    private boolean hasNextWithMetricsUpdate(ResultSet resultSet) throws SQLException {
        Instant now = Instant.now();
        if (!resultSet.next()) {
            return false;
        }
        this.metrics.addCurrentResultSetNext(Duration.between(now, Instant.now()));
        return true;
    }

    protected void addToTransaction(String str, LogMinerEventRow logMinerEventRow, Supplier<LogMinerEvent> supplier) {
        if (isTransactionIdAllowed(str)) {
            Transaction transaction = getTransactionCache().get(str);
            if (transaction == null) {
                LOGGER.trace("Transaction {} not in cache, creating.", str);
                transaction = new Transaction(str, logMinerEventRow.getScn(), logMinerEventRow.getChangeTime());
                getTransactionCache().put(str, transaction);
            }
            if (logMinerEventRow.getHash() == 0 || !transaction.getHashes().contains(Long.valueOf(logMinerEventRow.getHash()))) {
                if (logMinerEventRow.getHash() != 0) {
                    transaction.getHashes().add(Long.valueOf(logMinerEventRow.getHash()));
                }
                LOGGER.trace("Adding {} to transaction {} for table '{}'.", new Object[]{logMinerEventRow.getOperation(), str, logMinerEventRow.getTableId()});
                transaction.getEvents().add(supplier.get());
                this.metrics.setActiveTransactions(getTransactionCache().size());
                this.metrics.calculateLagMetrics(logMinerEventRow.getChangeTime());
            }
        }
    }

    private Table dispatchSchemaChangeEventAndGetTableForNewCapturedTable(TableId tableId, OracleOffsetContext oracleOffsetContext, EventDispatcher<TableId> eventDispatcher) throws SQLException, InterruptedException {
        LOGGER.info("Table '{}' is new and will now be captured.", tableId);
        oracleOffsetContext.event(tableId, Instant.now());
        eventDispatcher.dispatchSchemaChangeEvent(tableId, new OracleSchemaChangeEventEmitter(this.connectorConfig, this.partition, oracleOffsetContext, tableId, tableId.catalog(), tableId.schema(), getTableMetadataDdl(tableId), getSchema(), Instant.now(), this.metrics));
        return getSchema().tableFor(tableId);
    }

    private String getTableMetadataDdl(TableId tableId) throws SQLException {
        this.counters.tableMetadataCount++;
        LOGGER.info("Getting database metadata for table '{}'", tableId);
        OracleConnection oracleConnection = new OracleConnection(this.connectorConfig.getJdbcConfig(), () -> {
            return getClass().getClassLoader();
        });
        try {
            String pdbName = getConfig().getPdbName();
            if (pdbName != null) {
                oracleConnection.setSessionToPdb(pdbName);
            }
            String tableMetadataDdl = oracleConnection.getTableMetadataDdl(tableId);
            oracleConnection.close();
            return tableMetadataDdl;
        } catch (Throwable th) {
            try {
                oracleConnection.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    private LogMinerDmlEntry parseDmlStatement(String str, Table table, String str2) {
        try {
            Instant now = Instant.now();
            LogMinerDmlEntry parse = this.dmlParser.parse(str, table, str2);
            this.metrics.addCurrentParseTime(Duration.between(now, Instant.now()));
            if (parse.getOldValues().length == 0 && (EventType.UPDATE == parse.getEventType() || EventType.DELETE == parse.getEventType())) {
                LOGGER.warn("The DML event '{}' contained no before state.", str);
                this.metrics.incrementWarningCount();
            }
            return parse;
        } catch (DmlParserException e) {
            throw new DmlParserException("DML statement couldn't be parsed. Please open a Jira issue with the statement '" + str + "'.", e);
        }
    }

    private String parseLobWriteSql(String str) {
        if (str == null) {
            return null;
        }
        int indexOf = str.indexOf(":= '");
        if (indexOf != -1) {
            return str.substring(indexOf + 4, str.lastIndexOf("'"));
        }
        int indexOf2 = str.indexOf(":= HEXTORAW");
        if (indexOf2 == -1) {
            throw new DebeziumException("Unable to parse unsupported LOB_WRITE SQL: " + str);
        }
        return str.substring(indexOf2 + 3, str.lastIndexOf("'") + 2);
    }
}
