package io.debezium.connector.oracle.logminer;

import io.debezium.DebeziumException;
import io.debezium.config.Configuration;
import io.debezium.connector.oracle.OracleConnection;
import io.debezium.connector.oracle.OracleConnectorConfig;
import io.debezium.connector.oracle.OracleDatabaseSchema;
import io.debezium.connector.oracle.OracleOffsetContext;
import io.debezium.connector.oracle.OraclePartition;
import io.debezium.connector.oracle.Scn;
import io.debezium.connector.oracle.logminer.LogFile;
import io.debezium.connector.oracle.logminer.logwriter.CommitLogWriterFlushStrategy;
import io.debezium.connector.oracle.logminer.logwriter.LogWriterFlushStrategy;
import io.debezium.connector.oracle.logminer.logwriter.RacCommitLogWriterFlushStrategy;
import io.debezium.connector.oracle.logminer.logwriter.ReadOnlyLogWriterFlushStrategy;
import io.debezium.connector.oracle.logminer.processor.LogMinerEventProcessor;
import io.debezium.jdbc.JdbcConfiguration;
import io.debezium.pipeline.ErrorHandler;
import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.source.snapshot.incremental.SignalBasedIncrementalSnapshotContext;
import io.debezium.pipeline.source.spi.ChangeEventSource;
import io.debezium.pipeline.source.spi.StreamingChangeEventSource;
import io.debezium.pipeline.txmetadata.TransactionContext;
import io.debezium.relational.Column;
import io.debezium.relational.Table;
import io.debezium.relational.TableId;
import io.debezium.snapshot.SnapshotterService;
import io.debezium.util.Clock;
import io.debezium.util.Metronome;
import io.debezium.util.Stopwatch;
import java.math.BigInteger;
import java.sql.CallableStatement;
import java.sql.SQLException;
import java.text.DecimalFormat;
import java.time.Duration;
import java.time.Instant;
import java.time.OffsetDateTime;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/debezium/connector/oracle/logminer/LogMinerStreamingChangeEventSource.class */
public class LogMinerStreamingChangeEventSource implements StreamingChangeEventSource<OraclePartition, OracleOffsetContext> {
    private static final int MAXIMUM_NAME_LENGTH = 30;
    private static final String ALL_COLUMN_LOGGING = "ALL COLUMN LOGGING";
    private static final int MINING_START_RETRIES = 5;
    private final OracleConnection jdbcConnection;
    private final EventDispatcher<OraclePartition, TableId> dispatcher;
    private final Clock clock;
    private final OracleDatabaseSchema schema;
    private final JdbcConfiguration jdbcConfiguration;
    private final OracleConnectorConfig.LogMiningStrategy strategy;
    private final ErrorHandler errorHandler;
    private final LogMinerStreamingChangeEventSourceMetrics streamingMetrics;
    private final OracleConnectorConfig connectorConfig;
    private final Duration archiveLogRetention;
    private final boolean archiveLogOnlyMode;
    private final String archiveDestinationName;
    private final LogFileCollector logCollector;
    private final boolean continuousMining;
    private Scn startScn;
    private Scn endScn = Scn.NULL;
    private Scn snapshotScn;
    private List<LogFile> currentLogFiles;
    private List<BigInteger> currentRedoLogSequences;
    private OracleOffsetContext effectiveOffset;
    private int currentBatchSize;
    private long currentSleepTime;
    private final SnapshotterService snapshotterService;
    private static final Logger LOGGER = LoggerFactory.getLogger(LogMinerStreamingChangeEventSource.class);
    private static final Long SMALL_REDO_LOG_WARNING = 524288000L;

    public LogMinerStreamingChangeEventSource(OracleConnectorConfig oracleConnectorConfig, OracleConnection oracleConnection, EventDispatcher<OraclePartition, TableId> eventDispatcher, ErrorHandler errorHandler, Clock clock, OracleDatabaseSchema oracleDatabaseSchema, Configuration configuration, LogMinerStreamingChangeEventSourceMetrics logMinerStreamingChangeEventSourceMetrics, SnapshotterService snapshotterService) {
        this.jdbcConnection = oracleConnection;
        this.dispatcher = eventDispatcher;
        this.clock = clock;
        this.schema = oracleDatabaseSchema;
        this.connectorConfig = oracleConnectorConfig;
        this.strategy = oracleConnectorConfig.getLogMiningStrategy();
        this.errorHandler = errorHandler;
        this.streamingMetrics = logMinerStreamingChangeEventSourceMetrics;
        this.jdbcConfiguration = JdbcConfiguration.adapt(configuration);
        this.archiveLogRetention = oracleConnectorConfig.getArchiveLogRetention();
        this.archiveLogOnlyMode = oracleConnectorConfig.isArchiveLogOnlyMode();
        this.archiveDestinationName = oracleConnectorConfig.getArchiveLogDestinationName();
        this.currentBatchSize = oracleConnectorConfig.getLogMiningBatchSizeDefault();
        this.currentSleepTime = oracleConnectorConfig.getLogMiningSleepTimeDefault().toMillis();
        this.continuousMining = oracleConnectorConfig.isLogMiningContinuousMining();
        this.snapshotterService = snapshotterService;
        this.streamingMetrics.setBatchSize(this.currentBatchSize);
        this.streamingMetrics.setSleepTime(this.currentSleepTime);
        this.logCollector = new LogFileCollector(oracleConnectorConfig, oracleConnection);
    }

    public void init(OracleOffsetContext oracleOffsetContext) throws InterruptedException {
        this.effectiveOffset = oracleOffsetContext == null ? emptyContext() : oracleOffsetContext;
    }

    private OracleOffsetContext emptyContext() {
        return OracleOffsetContext.create().logicalName(this.connectorConfig).snapshotPendingTransactions(Collections.emptyMap()).transactionContext(new TransactionContext()).incrementalSnapshotContext(new SignalBasedIncrementalSnapshotContext()).build();
    }

    public void execute(ChangeEventSource.ChangeEventSourceContext changeEventSourceContext, OraclePartition oraclePartition, OracleOffsetContext oracleOffsetContext) {
        try {
            try {
                prepareConnection(false);
                this.effectiveOffset = oracleOffsetContext;
                this.startScn = this.connectorConfig.getAdapter().getOffsetScn(this.effectiveOffset);
                this.snapshotScn = oracleOffsetContext.getSnapshotScn();
                Scn orElseThrow = this.jdbcConnection.getFirstScnInLogs(this.archiveLogRetention, this.archiveDestinationName).orElseThrow(() -> {
                    return new DebeziumException("Failed to calculate oldest SCN available in logs");
                });
                if (this.startScn.compareTo(this.snapshotScn) == 0) {
                    computeStartScnForFirstMiningSession(oracleOffsetContext, orElseThrow);
                }
                LogWriterFlushStrategy resolveFlushStrategy = resolveFlushStrategy();
                try {
                    if (!this.continuousMining && this.startScn.compareTo(orElseThrow.subtract(Scn.ONE)) < 0) {
                        throw new DebeziumException("Online REDO LOG files or archive log files do not contain the offset scn " + String.valueOf(this.startScn) + ".  Please perform a new snapshot.");
                    }
                    checkDatabaseAndTableState(this.jdbcConnection, this.connectorConfig.getPdbName(), this.schema);
                    logOnlineRedoLogSizes(this.connectorConfig);
                    LogMinerEventProcessor createProcessor = createProcessor(changeEventSourceContext, oraclePartition, oracleOffsetContext);
                    try {
                        if (this.archiveLogOnlyMode && !waitForStartScnInArchiveLogs(changeEventSourceContext, this.startScn)) {
                            if (createProcessor != null) {
                                createProcessor.close();
                            }
                            if (resolveFlushStrategy != null) {
                                resolveFlushStrategy.close();
                            }
                            LOGGER.info("startScn={}, endScn={}", this.startScn, this.endScn);
                            LOGGER.info("Streaming metrics dump: {}", this.streamingMetrics.toString());
                            LOGGER.info("Offsets: {}", oracleOffsetContext);
                            return;
                        }
                        initializeRedoLogsForMining(this.jdbcConnection, false, this.startScn);
                        int i = 1;
                        Stopwatch start = Stopwatch.accumulating().start();
                        while (changeEventSourceContext.isRunning()) {
                            this.streamingMetrics.setDatabaseTimeDifference(getDatabaseSystemTime(this.jdbcConnection));
                            if (this.archiveLogOnlyMode && !waitForStartScnInArchiveLogs(changeEventSourceContext, this.startScn)) {
                                break;
                            }
                            Instant now = Instant.now();
                            this.endScn = calculateUpperBounds(this.startScn, this.endScn);
                            if (this.endScn.isNull()) {
                                LOGGER.debug("Requested delay of mining by one iteration");
                                pauseBetweenMiningSessions();
                            } else if (this.archiveLogOnlyMode && this.startScn.equals(this.endScn)) {
                                pauseBetweenMiningSessions();
                            } else {
                                resolveFlushStrategy.flush(this.jdbcConnection.getCurrentScn());
                                boolean z = false;
                                if (this.connectorConfig.getLogMiningMaximumSession().isPresent()) {
                                    if (start.stop().durations().statistics().getTotal().toMillis() >= this.connectorConfig.getLogMiningMaximumSession().get().toMillis()) {
                                        LOGGER.info("LogMiner session has exceeded maximum session time of '{}', forcing restart.", this.connectorConfig.getLogMiningMaximumSession());
                                        z = true;
                                    } else {
                                        start.start();
                                    }
                                }
                                if (z || hasLogSwitchOccurred()) {
                                    endMiningSession(this.jdbcConnection, oracleOffsetContext);
                                    if (this.connectorConfig.isLogMiningRestartConnection()) {
                                        prepareConnection(true);
                                    }
                                    initializeRedoLogsForMining(this.jdbcConnection, true, this.startScn);
                                    start = Stopwatch.accumulating().start();
                                }
                                if (changeEventSourceContext.isRunning()) {
                                    if (startMiningSession(this.jdbcConnection, this.startScn, this.endScn, i)) {
                                        i = 1;
                                        this.startScn = createProcessor.process(this.startScn, this.endScn);
                                        this.streamingMetrics.setLastBatchProcessingDuration(Duration.between(now, Instant.now()));
                                        captureSessionMemoryStatistics(this.jdbcConnection);
                                    } else {
                                        i++;
                                    }
                                    pauseBetweenMiningSessions();
                                }
                                if (changeEventSourceContext.isPaused()) {
                                    LOGGER.info("Streaming will now pause");
                                    changeEventSourceContext.streamingPaused();
                                    changeEventSourceContext.waitSnapshotCompletion();
                                    LOGGER.info("Streaming resumed");
                                }
                            }
                        }
                        if (createProcessor != null) {
                            createProcessor.close();
                        }
                        if (resolveFlushStrategy != null) {
                            resolveFlushStrategy.close();
                        }
                        LOGGER.info("startScn={}, endScn={}", this.startScn, this.endScn);
                        LOGGER.info("Streaming metrics dump: {}", this.streamingMetrics.toString());
                        LOGGER.info("Offsets: {}", oracleOffsetContext);
                    } catch (Throwable th) {
                        if (createProcessor != null) {
                            try {
                                createProcessor.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        }
                        throw th;
                    }
                } catch (Throwable th3) {
                    if (resolveFlushStrategy != null) {
                        try {
                            resolveFlushStrategy.close();
                        } catch (Throwable th4) {
                            th3.addSuppressed(th4);
                        }
                    }
                    throw th3;
                }
            } catch (Throwable th5) {
                LOGGER.info("startScn={}, endScn={}", this.startScn, this.endScn);
                LOGGER.info("Streaming metrics dump: {}", this.streamingMetrics.toString());
                LOGGER.info("Offsets: {}", oracleOffsetContext);
                throw th5;
            }
        } catch (Throwable th6) {
            LOGGER.error("Mining session stopped due to error.", th6);
            this.streamingMetrics.incrementErrorCount();
            this.errorHandler.setProducerThrowable(th6);
            LOGGER.info("startScn={}, endScn={}", this.startScn, this.endScn);
            LOGGER.info("Streaming metrics dump: {}", this.streamingMetrics.toString());
            LOGGER.info("Offsets: {}", oracleOffsetContext);
        }
    }

    private void prepareConnection(boolean z) throws SQLException {
        if (z) {
            LOGGER.debug("Log switch or maximum session threshold detected, restarting Oracle JDBC connection.");
            this.jdbcConnection.close();
        }
        this.jdbcConnection.setAutoCommit(false);
        setNlsSessionParameters(this.jdbcConnection);
    }

    private void logOnlineRedoLogSizes(OracleConnectorConfig oracleConnectorConfig) throws SQLException {
        this.jdbcConnection.query("SELECT GROUP#, BYTES FROM V$LOG ORDER BY 1", resultSet -> {
            LOGGER.info("Redo Log Group Sizes:");
            boolean z = false;
            while (resultSet.next()) {
                long j = resultSet.getLong(2);
                if (j < SMALL_REDO_LOG_WARNING.longValue()) {
                    z = true;
                }
                LOGGER.info("\tGroup #{}: {} bytes", Integer.valueOf(resultSet.getInt(1)), Long.valueOf(j));
            }
            if (oracleConnectorConfig.getAdapter().getType().equals(LogMinerAdapter.TYPE) && oracleConnectorConfig.getLogMiningStrategy() == OracleConnectorConfig.LogMiningStrategy.CATALOG_IN_REDO && z) {
                LOGGER.warn("Redo logs may be sized too small using the default mining strategy, consider increasing redo log sizes to a minimum of 500MB.");
            }
        });
    }

    private void computeStartScnForFirstMiningSession(OracleOffsetContext oracleOffsetContext, Scn scn) {
        Map<String, Scn> snapshotPendingTransactions = oracleOffsetContext.getSnapshotPendingTransactions();
        if (snapshotPendingTransactions == null || snapshotPendingTransactions.isEmpty()) {
            this.startScn = this.snapshotScn;
        } else {
            Scn scn2 = this.snapshotScn;
            for (Map.Entry<String, Scn> entry : snapshotPendingTransactions.entrySet()) {
                String key = entry.getKey();
                Scn value = entry.getValue();
                LOGGER.info("Transaction {} was pending across snapshot boundary. Start SCN = {}, snapshot SCN = {}", new Object[]{key, value, this.startScn});
                if (value.compareTo(scn) < 0) {
                    LOGGER.warn("Transaction {} was still ongoing while snapshot was taken, but is no longer completely recorded in the archive logs. Events will be lost. Oldest SCN in logs = {}, TX start SCN = {}", new Object[]{key, scn, value});
                    scn2 = scn;
                } else if (value.compareTo(scn2) < 0) {
                    scn2 = value;
                }
            }
            if (oracleOffsetContext.getCommitScn().compareTo(this.snapshotScn) < 0) {
                LOGGER.info("Setting commit SCN to {} (snapshot SCN - 1) to ensure we don't double-emit events from pre-snapshot transactions.", this.snapshotScn.subtract(Scn.ONE));
                oracleOffsetContext.getCommitScn().setCommitScnOnAllThreads(this.snapshotScn.subtract(Scn.ONE));
            }
            if (scn2.compareTo(this.startScn) < 0) {
                LOGGER.info("Resetting start SCN from {} (snapshot SCN) to {} (start of oldest complete pending transaction)", this.startScn, scn2);
                this.startScn = scn2.subtract(Scn.ONE);
            }
        }
        oracleOffsetContext.setScn(this.startScn);
    }

    private void captureSessionMemoryStatistics(OracleConnection oracleConnection) throws SQLException {
        this.streamingMetrics.setUserGlobalAreaMemory(oracleConnection.getSessionStatisticByName("session uga memory").longValue(), oracleConnection.getSessionStatisticByName("session uga memory max").longValue());
        this.streamingMetrics.setProcessGlobalAreaMemory(oracleConnection.getSessionStatisticByName("session pga memory").longValue(), oracleConnection.getSessionStatisticByName("session pga memory max").longValue());
        DecimalFormat decimalFormat = new DecimalFormat("#.##");
        LOGGER.debug("Oracle Session UGA {}MB (max = {}MB), PGA {}MB (max = {}MB)", new Object[]{decimalFormat.format((((float) r0) / 1024.0f) / 1024.0f), decimalFormat.format((((float) r0) / 1024.0f) / 1024.0f), decimalFormat.format((((float) r0) / 1024.0f) / 1024.0f), decimalFormat.format((((float) r0) / 1024.0f) / 1024.0f)});
    }

    private LogMinerEventProcessor createProcessor(ChangeEventSource.ChangeEventSourceContext changeEventSourceContext, OraclePartition oraclePartition, OracleOffsetContext oracleOffsetContext) {
        return this.connectorConfig.getLogMiningBufferType().createProcessor(changeEventSourceContext, this.connectorConfig, this.jdbcConnection, this.dispatcher, oraclePartition, oracleOffsetContext, this.schema, this.streamingMetrics);
    }

    private void initializeRedoLogsForMining(OracleConnection oracleConnection, boolean z, Scn scn) throws SQLException {
        if (!this.continuousMining) {
            oracleConnection.removeAllLogFilesFromLogMinerSession();
        }
        if ((!z || !this.continuousMining) && OracleConnectorConfig.LogMiningStrategy.CATALOG_IN_REDO.equals(this.strategy)) {
            buildDataDictionary(oracleConnection);
        }
        if (!this.continuousMining) {
            this.currentLogFiles = this.logCollector.getLogs(scn);
            for (LogFile logFile : this.currentLogFiles) {
                LOGGER.trace("Adding log file {} to the mining session.", logFile.getFileName());
                CallableStatement prepareCall = oracleConnection.connection(false).prepareCall(SqlUtils.addLogFileStatement("DBMS_LOGMNR.ADDFILE", logFile.getFileName()));
                try {
                    prepareCall.execute();
                    if (prepareCall != null) {
                        prepareCall.close();
                    }
                } catch (Throwable th) {
                    if (prepareCall != null) {
                        try {
                            prepareCall.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    }
                    throw th;
                }
            }
            this.currentRedoLogSequences = getCurrentLogFileSequences(this.currentLogFiles);
        }
        updateRedoLogMetrics();
    }

    private List<BigInteger> getCurrentLogFileSequences(List<LogFile> list) {
        return (list == null || list.isEmpty()) ? Collections.emptyList() : (List) list.stream().filter((v0) -> {
            return v0.isCurrent();
        }).map((v0) -> {
            return v0.getSequence();
        }).collect(Collectors.toList());
    }

    private Scn getMaxArchiveLogScn(List<LogFile> list) {
        if (list == null || list.isEmpty()) {
            throw new DebeziumException("Cannot get maximum archive log SCN as no logs were available.");
        }
        List list2 = (List) list.stream().filter(logFile -> {
            return logFile.getType().equals(LogFile.Type.ARCHIVE);
        }).collect(Collectors.toList());
        if (list2.isEmpty()) {
            throw new DebeziumException("Cannot get maximum archive log SCN as no archive logs are present.");
        }
        Scn nextScn = ((LogFile) list2.get(0)).getNextScn();
        for (int i = 1; i < list2.size(); i++) {
            Scn nextScn2 = ((LogFile) list2.get(i)).getNextScn();
            if (nextScn2.compareTo(nextScn) > 0) {
                nextScn = nextScn2;
            }
        }
        LOGGER.debug("Maximum archive log SCN resolved as {}", nextScn);
        return nextScn;
    }

    private void buildDataDictionary(OracleConnection oracleConnection) throws SQLException {
        LOGGER.trace("Building data dictionary");
        oracleConnection.executeWithoutCommitting(new String[]{"BEGIN DBMS_LOGMNR_D.BUILD (options => DBMS_LOGMNR_D.STORE_IN_REDO_LOGS); END;"});
    }

    private boolean hasLogSwitchOccurred() throws SQLException {
        List<BigInteger> currentRedoLogSequences = getCurrentRedoLogSequences();
        if (currentRedoLogSequences.equals(this.currentRedoLogSequences)) {
            return false;
        }
        LOGGER.debug("Current log sequence(s) is now {}, was {}", currentRedoLogSequences, this.currentRedoLogSequences);
        this.currentRedoLogSequences = currentRedoLogSequences;
        this.streamingMetrics.setSwitchCount(((Integer) this.jdbcConnection.queryAndMap(SqlUtils.switchHistoryQuery(this.archiveDestinationName), resultSet -> {
            if (resultSet.next()) {
                return Integer.valueOf(resultSet.getInt(2));
            }
            return 0;
        })).intValue());
        return true;
    }

    private void updateRedoLogMetrics() throws SQLException {
        Map<String, String> map = (Map) this.jdbcConnection.queryAndMap(SqlUtils.redoLogStatusQuery(), resultSet -> {
            LinkedHashMap linkedHashMap = new LinkedHashMap();
            while (resultSet.next()) {
                linkedHashMap.put(resultSet.getString(1), resultSet.getString(2));
            }
            return linkedHashMap;
        });
        this.streamingMetrics.setCurrentLogFileNames(getCurrentRedoLogFiles(this.jdbcConnection));
        this.streamingMetrics.setRedoLogStatuses(map);
    }

    private Set<String> getCurrentRedoLogFiles(OracleConnection oracleConnection) throws SQLException {
        HashSet hashSet = new HashSet();
        oracleConnection.query(SqlUtils.currentRedoNameQuery(), resultSet -> {
            while (resultSet.next()) {
                hashSet.add(resultSet.getString(1));
            }
        });
        LOGGER.trace("Current redo log filenames: {}", hashSet);
        return hashSet;
    }

    private List<BigInteger> getCurrentRedoLogSequences() throws SQLException {
        return (List) this.jdbcConnection.queryAndMap(SqlUtils.currentRedoLogSequenceQuery(), resultSet -> {
            ArrayList arrayList = new ArrayList();
            while (resultSet.next()) {
                arrayList.add(new BigInteger(resultSet.getString(1)));
            }
            return arrayList;
        });
    }

    private void pauseBetweenMiningSessions() throws InterruptedException {
        Metronome.sleeper(Duration.ofMillis(this.streamingMetrics.getSleepTimeInMilliseconds()), this.clock).pause();
    }

    private void setNlsSessionParameters(OracleConnection oracleConnection) throws SQLException {
        oracleConnection.executeWithoutCommitting(new String[]{"ALTER SESSION SET   NLS_DATE_FORMAT = 'YYYY-MM-DD HH24:MI:SS'  NLS_TIMESTAMP_FORMAT = 'YYYY-MM-DD HH24:MI:SS.FF9'  NLS_TIMESTAMP_TZ_FORMAT = 'YYYY-MM-DD HH24:MI:SS.FF9 TZH:TZM'  NLS_NUMERIC_CHARACTERS = '.,'"});
        oracleConnection.executeWithoutCommitting(new String[]{"ALTER SESSION SET TIME_ZONE = '00:00'"});
    }

    private OffsetDateTime getDatabaseSystemTime(OracleConnection oracleConnection) throws SQLException {
        return (OffsetDateTime) oracleConnection.singleOptionalValue("SELECT SYSTIMESTAMP FROM DUAL", resultSet -> {
            return (OffsetDateTime) resultSet.getObject(1, OffsetDateTime.class);
        });
    }

    public boolean startMiningSession(OracleConnection oracleConnection, Scn scn, Scn scn2, int i) throws SQLException {
        LOGGER.debug("Starting mining session startScn={}, endScn={}, strategy={}, continuous={}, attempts={}/{}", new Object[]{scn, scn2, this.strategy, Boolean.valueOf(this.continuousMining), Integer.valueOf(i), 5});
        try {
            Instant now = Instant.now();
            oracleConnection.executeWithoutCommitting(new String[]{SqlUtils.startLogMinerStatement(scn.add(Scn.ONE), scn2, this.strategy, this.continuousMining)});
            this.streamingMetrics.setLastMiningSessionStartDuration(Duration.between(now, Instant.now()));
            return true;
        } catch (SQLException e) {
            LogMinerDatabaseStateWriter.writeLogMinerStartParameters(oracleConnection);
            if (e.getErrorCode() == 1291 || e.getMessage().startsWith("ORA-01291")) {
                if (i <= 5) {
                    LOGGER.warn("Failed to start Oracle LogMiner session, retrying...");
                    return false;
                }
                LOGGER.error("Failed to start Oracle LogMiner after '{}' attempts.", 5, e);
                LogMinerDatabaseStateWriter.writeLogMinerLogFailures(oracleConnection);
            }
            LOGGER.error("Got exception when starting mining session.", e);
            LogMinerDatabaseStateWriter.write(oracleConnection);
            throw e;
        }
    }

    public void endMiningSession(OracleConnection oracleConnection, OracleOffsetContext oracleOffsetContext) throws SQLException {
        try {
            LOGGER.trace("Ending log mining startScn={}, endScn={}, offsetContext.getScn={}, strategy={}, continuous={}", new Object[]{this.startScn, this.endScn, oracleOffsetContext.getScn(), this.strategy, Boolean.valueOf(this.continuousMining)});
            oracleConnection.executeWithoutCommitting(new String[]{"BEGIN SYS.DBMS_LOGMNR.END_LOGMNR(); END;"});
        } catch (SQLException e) {
            if (!e.getMessage().toUpperCase().contains("ORA-01307")) {
                throw e;
            }
            LOGGER.info("LogMiner mining session is already closed.");
        }
    }

    private void updateBatchSize(boolean z) {
        int logMiningBatchSizeMin = this.connectorConfig.getLogMiningBatchSizeMin();
        int logMiningBatchSizeMax = this.connectorConfig.getLogMiningBatchSizeMax();
        if (z && this.currentBatchSize < logMiningBatchSizeMax) {
            this.currentBatchSize = Math.min(this.currentBatchSize + logMiningBatchSizeMin, logMiningBatchSizeMax);
            if (this.currentBatchSize == logMiningBatchSizeMax) {
                LOGGER.info("The connector is now using the maximum batch size {} when querying the LogMiner view.{}", Integer.valueOf(this.currentBatchSize), this.connectorConfig.isLobEnabled() ? "" : " This could be indicate of a large SCN gap.");
            }
        } else if (!z && this.currentBatchSize > logMiningBatchSizeMin) {
            this.currentBatchSize = Math.max(this.currentBatchSize - logMiningBatchSizeMin, logMiningBatchSizeMin);
        }
        if (this.currentBatchSize != logMiningBatchSizeMax) {
            LOGGER.debug("Updated batch size window, using batch size {}", Integer.valueOf(this.currentBatchSize));
        }
        this.streamingMetrics.setBatchSize(this.currentBatchSize);
    }

    private void updateSleepTime(boolean z) {
        if (z && this.currentSleepTime < this.connectorConfig.getLogMiningSleepTimeMax().toMillis()) {
            this.currentSleepTime += this.connectorConfig.getLogMiningSleepTimeIncrement().toMillis();
        } else if (this.currentSleepTime > this.connectorConfig.getLogMiningSleepTimeMin().toMillis()) {
            this.currentSleepTime -= this.connectorConfig.getLogMiningSleepTimeIncrement().toMillis();
        }
        LOGGER.debug("Updated sleep time window, using sleep time {}.", Long.valueOf(this.currentSleepTime));
        this.streamingMetrics.setSleepTime(this.currentSleepTime);
    }

    private Scn calculateUpperBounds(Scn scn, Scn scn2) throws SQLException {
        Scn maxArchiveLogScn = this.archiveLogOnlyMode ? getMaxArchiveLogScn(this.currentLogFiles) : this.jdbcConnection.getCurrentScn();
        this.streamingMetrics.setCurrentScn(this.jdbcConnection.getCurrentScn());
        Scn add = scn.add(Scn.valueOf(this.streamingMetrics.getBatchSize()));
        Scn valueOf = Scn.valueOf(this.connectorConfig.getLogMiningBatchSizeDefault());
        Scn scn3 = add;
        boolean z = false;
        if (add.subtract(maxArchiveLogScn).compareTo(valueOf) > 0) {
            updateBatchSize(false);
            z = true;
        }
        if (maxArchiveLogScn.subtract(add).compareTo(valueOf) > 0) {
            updateBatchSize(true);
        }
        if (maxArchiveLogScn.compareTo(add) < 0) {
            if (!z) {
                updateSleepTime(true);
            }
            LOGGER.debug("Batch upper bounds {} exceeds maximum read position, capping to {}.", add, maxArchiveLogScn);
            scn3 = maxArchiveLogScn;
        } else if (scn2.isNull() || add.compareTo(scn2) > 0) {
            updateSleepTime(false);
            if (add.compareTo(scn) < 0) {
                LOGGER.debug("Batch upper bounds {} is before start SCN {}, fallback to maximum read position {}.", new Object[]{add, scn, maxArchiveLogScn});
                scn3 = maxArchiveLogScn;
            } else if (!scn2.isNull() && maxArchiveLogScn.subtract(scn2).compareTo(Scn.valueOf(this.connectorConfig.getLogMiningScnGapDetectionGapSizeMin())) > 0) {
                Optional<Instant> scnToTimestamp = this.jdbcConnection.getScnToTimestamp(scn2);
                if (scnToTimestamp.isPresent()) {
                    Optional<Instant> scnToTimestamp2 = this.jdbcConnection.getScnToTimestamp(maxArchiveLogScn);
                    if (scnToTimestamp2.isPresent() && ChronoUnit.MILLIS.between(scnToTimestamp.get(), scnToTimestamp2.get()) < this.connectorConfig.getLogMiningScnGapDetectionTimeIntervalMaxMs()) {
                        LOGGER.warn("Detected possible SCN gap, using upperBounds SCN, startSCN {}, prevEndSCN {}, timestamp {}, upperBounds SCN {} timestamp {}.", new Object[]{scn, scn2, scnToTimestamp.get(), maxArchiveLogScn, scnToTimestamp2.get()});
                        scn3 = maxArchiveLogScn;
                    }
                }
            }
        } else {
            LOGGER.debug("Batch size upper bounds {} too small, using maximum read position {} instead.", add, maxArchiveLogScn);
            scn3 = maxArchiveLogScn;
        }
        Duration logMiningMaxScnDeviation = this.connectorConfig.getLogMiningMaxScnDeviation();
        if (!logMiningMaxScnDeviation.isZero()) {
            Optional<Scn> calculateDeviatedEndScn = calculateDeviatedEndScn(scn, scn3, logMiningMaxScnDeviation);
            if (calculateDeviatedEndScn.isEmpty()) {
                return Scn.NULL;
            }
            LOGGER.debug("Adjusted upper bounds {} based on deviation to {}.", scn3, calculateDeviatedEndScn.get());
            scn3 = calculateDeviatedEndScn.get();
        }
        Scn scn4 = (Scn) this.jdbcConnection.getRedoThreadState().getThreads().stream().filter((v0) -> {
            return v0.isOpen();
        }).map((v0) -> {
            return v0.getLastRedoScn();
        }).min((v0, v1) -> {
            return v0.compareTo(v1);
        }).orElse(Scn.NULL);
        if (!scn4.isNull() && scn4.compareTo(scn3) < 0) {
            if (scn4.compareTo(scn) < 0) {
                return Scn.NULL;
            }
            LOGGER.debug("Adjusting upper bounds {} to minimum read thread flush SCN {}.", scn3, scn4);
            scn3 = scn4;
        }
        if (scn3.compareTo(scn) <= 0) {
            LOGGER.debug("Final upper bounds {} matches start read position, delay required.", scn3);
            return Scn.NULL;
        }
        LOGGER.debug("Final upper bounds range is {}.", scn3);
        return scn3;
    }

    private Optional<Scn> calculateDeviatedEndScn(Scn scn, Scn scn2, Duration duration) {
        if (this.archiveLogOnlyMode) {
            return Optional.of(scn2);
        }
        Optional<Scn> deviatedMaxScn = getDeviatedMaxScn(scn2, duration);
        if (deviatedMaxScn.isEmpty() || deviatedMaxScn.get().isNull()) {
            LOGGER.warn("Mining session end SCN deviation calculation is outside undo space, using upperbounds {}. If this continues, consider lowering the value of the '{}' configuration property.", scn2, OracleConnectorConfig.LOG_MINING_MAX_SCN_DEVIATION_MS.name());
            return Optional.of(scn2);
        }
        if (deviatedMaxScn.get().compareTo(scn) > 0) {
            return deviatedMaxScn;
        }
        LOGGER.debug("Mining session end SCN deviation as {}, outside of mining range, recalculating.", deviatedMaxScn.get());
        return Optional.empty();
    }

    private Optional<Scn> getDeviatedMaxScn(Scn scn, Duration duration) {
        try {
            Optional<Instant> scnToTimestamp = this.jdbcConnection.getScnToTimestamp(this.jdbcConnection.getCurrentScn());
            Optional<Instant> scnToTimestamp2 = this.jdbcConnection.getScnToTimestamp(scn);
            if (!scnToTimestamp.isPresent() || !scnToTimestamp2.isPresent() || Duration.between(scnToTimestamp2.get(), scnToTimestamp.get()).compareTo(duration) < 0) {
                return Optional.of(this.jdbcConnection.getScnAdjustedByTime(scn, duration));
            }
            LOGGER.trace("Upper bounds {} is within deviation period, using it.", scn);
            return Optional.of(scn);
        } catch (SQLException e) {
            LOGGER.warn("Failed to calculate deviated max SCN value from {}.", scn);
            return Optional.empty();
        }
    }

    private void checkDatabaseAndTableState(OracleConnection oracleConnection, String str, OracleDatabaseSchema oracleDatabaseSchema) throws SQLException {
        Instant now = Instant.now();
        LOGGER.trace("Checking database and table state, this may take time depending on the size of your schema.");
        if (str != null) {
            try {
                oracleConnection.setSessionToPdb(str);
            } finally {
                if (str != null) {
                    oracleConnection.resetSessionToCdb();
                }
            }
        }
        if (isDatabaseAllSupplementalLoggingEnabled(oracleConnection)) {
            for (TableId tableId : oracleDatabaseSchema.tableIds()) {
                Table tableFor = oracleDatabaseSchema.tableFor(tableId);
                if (tableFor == null) {
                    throw new DebeziumException("Unable to find table in relational model: " + String.valueOf(tableId));
                }
                checkTableColumnNameLengths(tableFor);
            }
        } else {
            if (!isDatabaseMinSupplementalLoggingEnabled(oracleConnection)) {
                throw new DebeziumException("Supplemental logging not properly configured. Use: ALTER DATABASE ADD SUPPLEMENTAL LOG DATA");
            }
            for (TableId tableId2 : oracleDatabaseSchema.tableIds()) {
                if (!oracleConnection.isTableExists(tableId2)) {
                    LOGGER.warn("Database table '{}' no longer exists, supplemental log check skipped", tableId2);
                } else if (!isTableAllColumnsSupplementalLoggingEnabled(oracleConnection, tableId2)) {
                    LOGGER.warn("Database table '{}' not configured with supplemental logging \"(ALL) COLUMNS\"; only explicitly changed columns will be captured. Use: ALTER TABLE {}.{} ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS", new Object[]{tableId2, tableId2.schema(), tableId2.table()});
                }
                Table tableFor2 = oracleDatabaseSchema.tableFor(tableId2);
                if (tableFor2 == null) {
                    throw new DebeziumException("Unable to find table in relational model: " + String.valueOf(tableId2));
                }
                checkTableColumnNameLengths(tableFor2);
            }
        }
        LOGGER.trace("Database and table state check finished after {} ms", Long.valueOf(Duration.between(now, Instant.now()).toMillis()));
    }

    private void checkTableColumnNameLengths(Table table) {
        if (table.id().table().length() > MAXIMUM_NAME_LENGTH) {
            LOGGER.warn("Table '{}' won't be captured by Oracle LogMiner because its name exceeds {} characters.", table.id().table(), Integer.valueOf(MAXIMUM_NAME_LENGTH));
        }
        for (Column column : table.columns()) {
            if (column.name().length() > MAXIMUM_NAME_LENGTH) {
                LOGGER.warn("Table '{}' won't be captured by Oracle LogMiner because column '{}' exceeds {} characters.", new Object[]{table.id().table(), column.name(), Integer.valueOf(MAXIMUM_NAME_LENGTH)});
            }
        }
    }

    private boolean isDatabaseAllSupplementalLoggingEnabled(OracleConnection oracleConnection) throws SQLException {
        return ((Boolean) oracleConnection.queryAndMap(SqlUtils.databaseSupplementalLoggingAllCheckQuery(), resultSet -> {
            while (resultSet.next()) {
                if ("YES".equalsIgnoreCase(resultSet.getString(2))) {
                    return true;
                }
            }
            return false;
        })).booleanValue();
    }

    private boolean isDatabaseMinSupplementalLoggingEnabled(OracleConnection oracleConnection) throws SQLException {
        return ((Boolean) oracleConnection.queryAndMap(SqlUtils.databaseSupplementalLoggingMinCheckQuery(), resultSet -> {
            while (resultSet.next()) {
                if ("YES".equalsIgnoreCase(resultSet.getString(2))) {
                    return true;
                }
            }
            return false;
        })).booleanValue();
    }

    private boolean isTableAllColumnsSupplementalLoggingEnabled(OracleConnection oracleConnection, TableId tableId) throws SQLException {
        return ((Boolean) oracleConnection.prepareQueryAndMap(SqlUtils.tableSupplementalLoggingCheckQuery(), preparedStatement -> {
            preparedStatement.setString(1, tableId.schema());
            preparedStatement.setString(2, tableId.table());
        }, resultSet -> {
            while (resultSet.next()) {
                if (ALL_COLUMN_LOGGING.equals(resultSet.getString(2))) {
                    return true;
                }
            }
            return false;
        })).booleanValue();
    }

    private LogWriterFlushStrategy resolveFlushStrategy() {
        return this.connectorConfig.isLogMiningReadOnly() ? new ReadOnlyLogWriterFlushStrategy() : this.connectorConfig.isRacSystem().booleanValue() ? new RacCommitLogWriterFlushStrategy(this.connectorConfig, this.jdbcConfiguration, this.streamingMetrics) : new CommitLogWriterFlushStrategy(this.connectorConfig, this.jdbcConnection);
    }

    private boolean waitForStartScnInArchiveLogs(ChangeEventSource.ChangeEventSourceContext changeEventSourceContext, Scn scn) throws SQLException, InterruptedException {
        boolean z = true;
        while (changeEventSourceContext.isRunning() && !isStartScnInArchiveLogs(scn)) {
            if (z) {
                LOGGER.warn("Starting SCN {} is not yet in archive logs, waiting for archive log switch.", scn);
                z = false;
                Metronome.sleeper(this.connectorConfig.getArchiveLogOnlyScnPollTime(), this.clock).pause();
            }
        }
        if (!changeEventSourceContext.isRunning()) {
            return false;
        }
        if (z) {
            return true;
        }
        LOGGER.info("Starting SCN {} is now available in archive logs, log mining unpaused.", scn);
        return true;
    }

    private boolean isStartScnInArchiveLogs(Scn scn) throws SQLException {
        try {
            return this.logCollector.getLogs(scn).stream().anyMatch(logFile -> {
                return logFile.getFirstScn().compareTo(scn) <= 0 && logFile.getNextScn().compareTo(scn) > 0 && logFile.getType().equals(LogFile.Type.ARCHIVE);
            });
        } catch (LogFileNotFoundException e) {
            return false;
        }
    }

    public void commitOffset(Map<String, ?> map, Map<String, ?> map2) {
    }

    /* renamed from: getOffsetContext, reason: merged with bridge method [inline-methods] */
    public OracleOffsetContext m47getOffsetContext() {
        return this.effectiveOffset;
    }
}
