package io.debezium.connector.oracle.logminer;

import io.debezium.DebeziumException;
import io.debezium.annotation.Immutable;
import io.debezium.connector.oracle.OracleConnection;
import io.debezium.connector.oracle.OracleConnectorConfig;
import io.debezium.connector.oracle.RedoThreadState;
import io.debezium.connector.oracle.Scn;
import io.debezium.connector.oracle.logminer.LogFile;
import io.debezium.function.Predicates;
import io.debezium.util.DelayStrategy;
import io.debezium.util.Strings;
import java.math.BigInteger;
import java.sql.SQLException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/debezium/connector/oracle/logminer/LogFileCollector.class */
public class LogFileCollector {
    private static final Logger LOGGER = LoggerFactory.getLogger(LogFileCollector.class);
    private static final String STATUS_CURRENT = "CURRENT";
    private static final String ONLINE_LOG_TYPE = "ONLINE";
    private static final String ARCHIVE_LOG_TYPE = "ARCHIVED";
    private final Duration initialDelay;
    private final Duration maxRetryDelay;
    private final int maxAttempts;
    private final Duration archiveLogRetention;
    private final boolean archiveLogOnlyMode;
    private final String archiveLogDestinationName;
    private final OracleConnection connection;

    /* JADX INFO: Access modifiers changed from: private */
    @Immutable
    /* loaded from: input_file:io/debezium/connector/oracle/logminer/LogFileCollector$SequenceRange.class */
    public static class SequenceRange {
        private final long min;
        private final long max;

        SequenceRange(long j, long j2) {
            this.min = j;
            this.max = j2;
        }

        public long getMin() {
            return this.min;
        }

        public long getMax() {
            return this.max;
        }
    }

    public LogFileCollector(OracleConnectorConfig oracleConnectorConfig, OracleConnection oracleConnection) {
        this.initialDelay = oracleConnectorConfig.getLogMiningInitialDelay();
        this.maxRetryDelay = oracleConnectorConfig.getLogMiningMaxDelay();
        this.maxAttempts = oracleConnectorConfig.getMaximumNumberOfLogQueryRetries();
        this.archiveLogRetention = oracleConnectorConfig.getArchiveLogRetention();
        this.archiveLogOnlyMode = oracleConnectorConfig.isArchiveLogOnlyMode();
        this.archiveLogDestinationName = oracleConnectorConfig.getArchiveLogDestinationName();
        this.connection = oracleConnection;
    }

    public List<LogFile> getLogs(Scn scn) throws SQLException {
        LOGGER.debug("Collecting logs based on the read SCN position {}.", scn);
        DelayStrategy exponential = DelayStrategy.exponential(this.initialDelay, this.maxRetryDelay);
        for (int i = 0; i <= this.maxAttempts; i++) {
            RedoThreadState redoThreadState = this.connection.getRedoThreadState();
            for (RedoThreadState.RedoThread redoThread : redoThreadState.getThreads()) {
                LOGGER.debug("Thread {}: {}", redoThread.getThreadId(), redoThread);
            }
            List<LogFile> logsForOffsetScn = getLogsForOffsetScn(scn);
            if (isLogFileListConsistent(scn, logsForOffsetScn, redoThreadState)) {
                return logsForOffsetScn;
            }
            LOGGER.info("No logs available yet (attempt {})...", Integer.valueOf(i + 1));
            exponential.sleepWhen(true);
        }
        throw new DebeziumException(String.format("None of the log files contain offset SCN: %s, re-snapshot is required.", scn));
    }

    public List<LogFile> getLogsForOffsetScn(Scn scn) throws SQLException {
        LinkedHashSet linkedHashSet = new LinkedHashSet();
        LinkedHashSet linkedHashSet2 = new LinkedHashSet();
        this.connection.query(getLogsQuery(scn), resultSet -> {
            while (resultSet.next()) {
                String string = resultSet.getString(1);
                Scn scnFromString = getScnFromString(resultSet.getString(2));
                Scn scnFromString2 = getScnFromString(resultSet.getString(3));
                String string2 = resultSet.getString(5);
                String string3 = resultSet.getString(6);
                BigInteger bigInteger = new BigInteger(resultSet.getString(7));
                int i = resultSet.getInt(10);
                if (ARCHIVE_LOG_TYPE.equals(string3)) {
                    LogFile logFile = new LogFile(string, scnFromString, scnFromString2, bigInteger, LogFile.Type.ARCHIVE, i);
                    if (logFile.getNextScn().compareTo(scn) >= 0) {
                        LOGGER.debug("Archive log {} with SCN range {} to {} sequence {} to be added.", new Object[]{string, scnFromString, scnFromString2, bigInteger});
                        linkedHashSet2.add(logFile);
                    }
                } else if (ONLINE_LOG_TYPE.equals(string3)) {
                    LogFile logFile2 = new LogFile(string, scnFromString, scnFromString2, bigInteger, LogFile.Type.REDO, STATUS_CURRENT.equalsIgnoreCase(string2), i);
                    if (logFile2.isCurrent() || logFile2.getNextScn().compareTo(scn) >= 0) {
                        LOGGER.debug("Online redo log {} with SCN range {} to {} ({}) sequence {} to be added.", new Object[]{string, scnFromString, scnFromString2, string2, bigInteger});
                        linkedHashSet.add(logFile2);
                    } else {
                        LOGGER.debug("Online redo log {} with SCN range {} to {} ({}) sequence {} to be excluded.", new Object[]{string, scnFromString, scnFromString2, string2, bigInteger});
                    }
                }
            }
        });
        return deduplicateLogFiles(linkedHashSet2, linkedHashSet);
    }

    public List<LogFile> deduplicateLogFiles(Collection<LogFile> collection, Collection<LogFile> collection2) {
        for (LogFile logFile : collection2) {
            collection.removeIf(logFile2 -> {
                if (!logFile2.equals(logFile)) {
                    return false;
                }
                LOGGER.debug("Removing redo thread {} archive log {} with duplicate sequence {} with redo log {}", new Object[]{Integer.valueOf(logFile2.getThread()), logFile2.getFileName(), logFile2.getSequence(), logFile.getFileName()});
                return true;
            });
        }
        ArrayList arrayList = new ArrayList();
        arrayList.addAll(collection);
        arrayList.addAll(collection2);
        return arrayList;
    }

    public boolean isLogFileListConsistent(Scn scn, List<LogFile> list, RedoThreadState redoThreadState) {
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("Performing consistency check on the following collected logs:");
            Iterator<LogFile> it = list.iterator();
            while (it.hasNext()) {
                LOGGER.debug("\tLog: {}", it.next());
            }
            LOGGER.debug("Current redo thread state:");
            Iterator<RedoThreadState.RedoThread> it2 = redoThreadState.getThreads().iterator();
            while (it2.hasNext()) {
                LOGGER.debug("\tThread: {}", it2.next());
            }
        }
        Map map = (Map) list.stream().collect(Collectors.groupingBy((v0) -> {
            return v0.getThread();
        }));
        List<Integer> list2 = (List) redoThreadState.getThreads().stream().map((v0) -> {
            return v0.getThreadId();
        }).collect(Collectors.toList());
        for (Integer num : list2) {
            RedoThreadState.RedoThread redoThread = redoThreadState.getRedoThread(num);
            if (redoThread.isOpen()) {
                if (!isOpenThreadConsistent(redoThread, scn, (List) map.get(num))) {
                    return false;
                }
            } else if (!isClosedThreadConsistent(redoThread, scn, (List) map.get(num))) {
                return false;
            }
        }
        Stream<R> map2 = list.stream().map((v0) -> {
            return v0.getThread();
        });
        Objects.requireNonNull(list2);
        map2.filter(Predicates.not((v1) -> {
            return r1.contains(v1);
        })).forEach((v1) -> {
            logThreadCheckSkippedNotInDatabase(v1);
        });
        return true;
    }

    private boolean isOpenThreadConsistent(RedoThreadState.RedoThread redoThread, Scn scn, List<LogFile> list) {
        int intValue = redoThread.getThreadId().intValue();
        Scn enabledScn = redoThread.getEnabledScn();
        Scn checkpointScn = redoThread.getCheckpointScn();
        if (redoThread.isDisabled()) {
            logException(String.format("Redo thread %d expected to have ENABLED with value PUBLIC or PRIVATE.", Integer.valueOf(intValue)));
            return false;
        }
        if (list == null || list.isEmpty()) {
            logException(String.format("Redo thread %d is inconsistent; enabled SCN %s checkpoint SCN %s reading from SCN %s, no logs found.", Integer.valueOf(intValue), enabledScn, checkpointScn, scn));
            return false;
        }
        if (enabledScn.compareTo(scn) <= 0) {
            Optional<Long> firstLogMissingSequence = getFirstLogMissingSequence(list);
            if (firstLogMissingSequence.isPresent()) {
                logException(String.format("Redo Thread %d is inconsistent; failed to find log with sequence %d", Integer.valueOf(intValue), firstLogMissingSequence.get()));
                return false;
            }
            LOGGER.debug("Redo Thread {} is consistent.", Integer.valueOf(intValue));
            return true;
        }
        List<LogFile> list2 = (List) list.stream().filter(logFile -> {
            return logFile.isScnInLogFileRange(enabledScn) || logFile.getFirstScn().compareTo(enabledScn) > 0;
        }).collect(Collectors.toList());
        if (list2.isEmpty()) {
            logException(String.format("Redo Thread %d is inconsistent; expected logs after enabled SCN %s", Integer.valueOf(intValue), list2));
            return false;
        }
        Optional<Long> firstLogMissingSequence2 = getFirstLogMissingSequence(list2);
        if (firstLogMissingSequence2.isPresent()) {
            logException(String.format("Redo Thread %d is inconsistent; failed to find log with sequence %d (enabled).", Integer.valueOf(intValue), firstLogMissingSequence2.get()));
            return false;
        }
        LOGGER.debug("Redo Thread {} is consistent after enabled SCN {} ({}).", new Object[]{Integer.valueOf(intValue), enabledScn, redoThread.getStatus()});
        return true;
    }

    private boolean isClosedThreadConsistent(RedoThreadState.RedoThread redoThread, Scn scn, List<LogFile> list) {
        List<LogFile> list2;
        int intValue = redoThread.getThreadId().intValue();
        if (redoThread.isDisabled()) {
            Scn disabledScn = redoThread.getDisabledScn();
            if (disabledScn.isNull() || disabledScn.asBigInteger().equals(BigInteger.ZERO)) {
                LOGGER.debug("Redo Thread {} is disabled but has no disabled SCN; consistency check skipped.", Integer.valueOf(intValue));
                return true;
            }
            if (list != null && !list.isEmpty()) {
                if (disabledScn.compareTo(scn) < 0) {
                    if (LOGGER.isDebugEnabled()) {
                        Iterator<LogFile> it = list.iterator();
                        while (it.hasNext()) {
                            LOGGER.debug("Redo Thread {} log {} not expected.", Integer.valueOf(intValue), it.next());
                        }
                    }
                    logException(String.format("Redo Thread %d disabled at SCN %s, but logs detected using SCN %s.", Integer.valueOf(intValue), disabledScn, scn));
                    return false;
                }
                List<LogFile> list3 = (List) list.stream().filter(logFile -> {
                    return logFile.isScnInLogFileRange(disabledScn) || logFile.getFirstScn().compareTo(disabledScn) < 0;
                }).collect(Collectors.toList());
                if (list3.isEmpty()) {
                    logException(String.format("Redo Thread %d is inconsistent; expected logs before disabled SCN %s.", Integer.valueOf(intValue), list3));
                    return false;
                }
                Optional<Long> firstLogMissingSequence = getFirstLogMissingSequence(list3);
                if (firstLogMissingSequence.isPresent()) {
                    logException(String.format("Redo Thread %d is inconsistent; failed to find log with sequence %d.", Integer.valueOf(intValue), firstLogMissingSequence.get()));
                    return false;
                }
            }
            LOGGER.debug("Redo Thread {} is consistent after disabled SCN {} ({}).", new Object[]{Integer.valueOf(intValue), disabledScn, redoThread.getStatus()});
            return true;
        }
        Scn checkpointScn = redoThread.getCheckpointScn();
        Scn enabledScn = redoThread.getEnabledScn();
        if (list != null && !list.isEmpty()) {
            if (checkpointScn.compareTo(scn) < 0) {
                if (LOGGER.isDebugEnabled()) {
                    Iterator<LogFile> it2 = list.iterator();
                    while (it2.hasNext()) {
                        LOGGER.debug("Read Thread {} query has log {}; not expected.", Integer.valueOf(intValue), it2.next());
                    }
                }
                logException(String.format("Redo Thread %d stopped at SCN %s, but logs detected using SCN %s.", Integer.valueOf(intValue), checkpointScn, scn));
                return false;
            }
            if (enabledScn.compareTo(scn) > 0) {
                list2 = (List) list.stream().filter(logFile2 -> {
                    return logFile2.isScnInLogFileRange(enabledScn) || logFile2.getNextScn().compareTo(enabledScn) >= 0;
                }).filter(logFile3 -> {
                    return logFile3.isScnInLogFileRange(checkpointScn) || logFile3.getFirstScn().compareTo(checkpointScn) < 0;
                }).collect(Collectors.toList());
                if (list2.isEmpty()) {
                    logException(String.format("Redo Thread %d is inconsistent; expected logs between enabled SCN %s and checkpoint SCN %s", Integer.valueOf(intValue), enabledScn, checkpointScn));
                    return false;
                }
            } else {
                list2 = (List) list.stream().filter(logFile4 -> {
                    return logFile4.isScnInLogFileRange(checkpointScn) || logFile4.getFirstScn().compareTo(checkpointScn) < 0;
                }).collect(Collectors.toList());
                if (list2.isEmpty()) {
                    logException(String.format("Redo Thread %d is inconsistent; expected logs before checkpoint SCN %s", Integer.valueOf(intValue), checkpointScn));
                    return false;
                }
            }
            Optional<Long> firstLogMissingSequence2 = getFirstLogMissingSequence(list2);
            if (firstLogMissingSequence2.isPresent()) {
                logException(String.format("Redo Thread %d is inconsistent; failed to find log with sequence %d (checkpoint).", Integer.valueOf(intValue), firstLogMissingSequence2.get()));
                return false;
            }
        }
        LOGGER.debug("Redo Thread {} is consistent before checkpoint SCN {} ({}).", new Object[]{Integer.valueOf(intValue), checkpointScn, redoThread.getStatus()});
        return true;
    }

    private Optional<Long> getFirstLogMissingSequence(List<LogFile> list) {
        SequenceRange sequenceRangeForRedoThreadLogs = getSequenceRangeForRedoThreadLogs(list);
        long min = sequenceRangeForRedoThreadLogs.getMin();
        while (true) {
            long j = min;
            if (j > sequenceRangeForRedoThreadLogs.getMax()) {
                return Optional.empty();
            }
            if (!hasLogFileWithSequenceNumber(j, list)) {
                return Optional.of(Long.valueOf(j));
            }
            min = j + 1;
        }
    }

    private void logThreadCheckSkippedNotInDatabase(int i) {
        LOGGER.warn("Log found for redo thread {} but no record in V$THREAD; thread consistency check skipped.", Integer.valueOf(i));
    }

    private String getLogsQuery(Scn scn) {
        return SqlUtils.allMinableLogsQuery(scn, this.archiveLogRetention, this.archiveLogOnlyMode, this.archiveLogDestinationName);
    }

    private Scn getScnFromString(String str) {
        return Strings.isNullOrBlank(str) ? Scn.MAX : Scn.valueOf(str);
    }

    private boolean hasLogFileWithSequenceNumber(long j, List<LogFile> list) {
        return list.stream().map((v0) -> {
            return v0.getSequence();
        }).anyMatch(bigInteger -> {
            return bigInteger.longValue() == j;
        });
    }

    private SequenceRange getSequenceRangeForRedoThreadLogs(List<LogFile> list) {
        if (list.isEmpty()) {
            throw new DebeziumException("Cannot calculate log sequence range, log collection is empty.");
        }
        long j = Long.MAX_VALUE;
        long j2 = Long.MIN_VALUE;
        for (LogFile logFile : list) {
            j = Math.min(logFile.getSequence().longValue(), j);
            j2 = Math.max(logFile.getSequence().longValue(), j2);
        }
        return new SequenceRange(j, j2);
    }

    private static void logException(String str) {
        LOGGER.info("{}", str, new DebeziumException(str));
    }
}
