package io.delta.kernel.internal.snapshot;

import io.delta.kernel.Snapshot;
import io.delta.kernel.engine.Engine;
import io.delta.kernel.exceptions.InvalidTableException;
import io.delta.kernel.exceptions.TableNotFoundException;
import io.delta.kernel.internal.DeltaErrors;
import io.delta.kernel.internal.DeltaHistoryManager;
import io.delta.kernel.internal.DeltaLogActionUtils;
import io.delta.kernel.internal.SnapshotImpl;
import io.delta.kernel.internal.annotation.VisibleForTesting;
import io.delta.kernel.internal.checkpoints.CheckpointInstance;
import io.delta.kernel.internal.checkpoints.Checkpointer;
import io.delta.kernel.internal.fs.Path;
import io.delta.kernel.internal.lang.ListUtils;
import io.delta.kernel.internal.metrics.SnapshotQueryContext;
import io.delta.kernel.internal.replay.LogReplay;
import io.delta.kernel.internal.replay.LogReplayUtils;
import io.delta.kernel.internal.util.FileNames;
import io.delta.kernel.internal.util.Preconditions;
import io.delta.kernel.internal.util.Tuple2;
import io.delta.kernel.utils.FileStatus;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/delta/kernel/internal/snapshot/SnapshotManager.class */
public class SnapshotManager {
    private final AtomicReference<SnapshotHint> latestSnapshotHint = new AtomicReference<>();
    private final Path tablePath;
    private final Path logPath;
    private static final Logger logger = LoggerFactory.getLogger(SnapshotManager.class);

    public SnapshotManager(Path path) {
        this.tablePath = path;
        this.logPath = new Path(path, "_delta_log");
    }

    public Snapshot buildLatestSnapshot(Engine engine, SnapshotQueryContext snapshotQueryContext) throws TableNotFoundException {
        LogSegment logSegmentForVersion = getLogSegmentForVersion(engine, Optional.empty());
        snapshotQueryContext.setVersion(logSegmentForVersion.getVersion());
        return createSnapshot(logSegmentForVersion, engine, snapshotQueryContext);
    }

    public SnapshotImpl getSnapshotAt(Engine engine, long j, SnapshotQueryContext snapshotQueryContext) throws TableNotFoundException {
        return createSnapshot(getLogSegmentForVersion(engine, Optional.of(Long.valueOf(j))), engine, snapshotQueryContext);
    }

    public Snapshot getSnapshotForTimestamp(Engine engine, long j, SnapshotQueryContext snapshotQueryContext) throws TableNotFoundException {
        long longValue = ((Long) snapshotQueryContext.getSnapshotMetrics().timestampToVersionResolutionTimer.time(() -> {
            return Long.valueOf(DeltaHistoryManager.getActiveCommitAtTimestamp(engine, this.logPath, j, true, false, false).getVersion());
        })).longValue();
        logger.info("{}: Took {} ms to fetch version at timestamp {}", new Object[]{this.tablePath, Long.valueOf(snapshotQueryContext.getSnapshotMetrics().timestampToVersionResolutionTimer.totalDurationMs()), Long.valueOf(j)});
        snapshotQueryContext.setVersion(longValue);
        return getSnapshotAt(engine, longValue, snapshotQueryContext);
    }

    @VisibleForTesting
    public static void verifyDeltaVersionsContiguous(List<Long> list, Path path) {
        for (int i = 1; i < list.size(); i++) {
            if (list.get(i).longValue() != list.get(i - 1).longValue() + 1) {
                throw new InvalidTableException(path.toString(), String.format("Missing delta files: versions are not contiguous: (%s)", list));
            }
        }
    }

    private void registerHint(SnapshotHint snapshotHint) {
        this.latestSnapshotHint.updateAndGet(snapshotHint2 -> {
            if (snapshotHint2 != null && snapshotHint.getVersion() <= snapshotHint2.getVersion()) {
                return snapshotHint2;
            }
            return snapshotHint;
        });
    }

    private SnapshotImpl createSnapshot(LogSegment logSegment, Engine engine, SnapshotQueryContext snapshotQueryContext) {
        String str = (String) logSegment.getCheckpointVersionOpt().map(l -> {
            return String.format("starting from checkpoint version %s.", l);
        }).orElse(Path.CUR_DIR);
        logger.info("{}: Loading version {} {}", new Object[]{this.tablePath, Long.valueOf(logSegment.getVersion()), str});
        long currentTimeMillis = System.currentTimeMillis();
        LogReplay logReplay = new LogReplay(this.logPath, this.tablePath, engine, logSegment, Optional.ofNullable(this.latestSnapshotHint.get()), snapshotQueryContext.getSnapshotMetrics());
        LogReplayUtils.assertLogFilesBelongToTable(this.logPath, logSegment.allLogFilesUnsorted());
        SnapshotImpl snapshotImpl = new SnapshotImpl(this.tablePath, logSegment, logReplay, logReplay.getProtocol(), logReplay.getMetadata(), snapshotQueryContext);
        engine.getMetricsReporters().forEach(metricsReporter -> {
            metricsReporter.report(snapshotImpl.getSnapshotReport());
        });
        logger.info("{}: Took {}ms to construct the snapshot (loading protocol and metadata) for {} {}", new Object[]{this.tablePath, Long.valueOf(System.currentTimeMillis() - currentTimeMillis), Long.valueOf(logSegment.getVersion()), str});
        registerHint(new SnapshotHint(snapshotImpl.getVersion(), snapshotImpl.getProtocol(), snapshotImpl.getMetadata()));
        return snapshotImpl;
    }

    @VisibleForTesting
    public LogSegment getLogSegmentForVersion(Engine engine, Optional<Long> optional) {
        logger.info("Loading log segment for version {}", (String) optional.map((v0) -> {
            return String.valueOf(v0);
        }).orElse("latest"));
        Optional<Long> startCheckpointVersion = getStartCheckpointVersion(engine, optional);
        long longValue = ((Long) startCheckpointVersion.map(l -> {
            logger.info("Found a complete checkpoint at version {}.", l);
            return l;
        }).orElseGet(() -> {
            logger.warn("Cannot find a complete checkpoint. Listing from version 0.");
            return 0L;
        })).longValue();
        HashSet hashSet = new HashSet(Arrays.asList(FileNames.DeltaLogFileType.COMMIT, FileNames.DeltaLogFileType.CHECKPOINT, FileNames.DeltaLogFileType.CHECKSUM));
        hashSet.add(FileNames.DeltaLogFileType.LOG_COMPACTION);
        long currentTimeMillis = System.currentTimeMillis();
        List<FileStatus> inMemoryList = DeltaLogActionUtils.listDeltaLogFilesAsIter(engine, hashSet, this.tablePath, longValue, optional, true).toInMemoryList();
        logger.info("{}: Took {}ms to list the files after starting checkpoint", this.tablePath, Long.valueOf(System.currentTimeMillis() - currentTimeMillis));
        if (inMemoryList.isEmpty()) {
            if (startCheckpointVersion.isPresent()) {
                throw DeltaErrors.missingCheckpoint(this.tablePath.toString(), startCheckpointVersion.get().longValue());
            }
            throw new TableNotFoundException(this.tablePath.toString(), String.format("No delta files found in the directory: %s", this.logPath));
        }
        logDebugFileStatuses("listedFileStatuses", inMemoryList);
        Map map = (Map) inMemoryList.stream().collect(Collectors.groupingBy(FileNames::determineFileType, LinkedHashMap::new, Collectors.toList()));
        List<FileStatus> list = (List) map.getOrDefault(FileNames.DeltaLogFileType.COMMIT, Collections.emptyList());
        List<FileStatus> list2 = (List) map.getOrDefault(FileNames.DeltaLogFileType.CHECKPOINT, Collections.emptyList());
        List<FileStatus> list3 = (List) map.getOrDefault(FileNames.DeltaLogFileType.LOG_COMPACTION, Collections.emptyList());
        List<FileStatus> list4 = (List) map.getOrDefault(FileNames.DeltaLogFileType.CHECKSUM, Collections.emptyList());
        logDebugFileStatuses("listedCheckpointFileStatuses", list2);
        logDebugFileStatuses("listedCompactionFileStatuses", list3);
        logDebugFileStatuses("listedDeltaFileStatuses", list);
        logDebugFileStatuses("listedCheckSumFileStatuses", list4);
        Optional<CheckpointInstance> latestCompleteCheckpointFromList = Checkpointer.getLatestCompleteCheckpointFromList((List) list2.stream().map(fileStatus -> {
            return new CheckpointInstance(fileStatus.getPath());
        }).collect(Collectors.toList()), (CheckpointInstance) optional.map((v1) -> {
            return new CheckpointInstance(v1);
        }).orElse(CheckpointInstance.MAX_VALUE));
        if (!latestCompleteCheckpointFromList.isPresent() && startCheckpointVersion.isPresent()) {
            throw DeltaErrors.missingCheckpoint(this.tablePath.toString(), startCheckpointVersion.get().longValue());
        }
        long longValue2 = ((Long) latestCompleteCheckpointFromList.map(checkpointInstance -> {
            return Long.valueOf(checkpointInstance.version);
        }).orElse(-1L)).longValue();
        logger.info("Latest complete checkpoint version: {}", Long.valueOf(longValue2));
        List<FileStatus> list5 = (List) list.stream().filter(fileStatus2 -> {
            long deltaVersion = FileNames.deltaVersion(new Path(fileStatus2.getPath()));
            return longValue2 + 1 <= deltaVersion && deltaVersion <= ((Long) optional.orElse(Long.MAX_VALUE)).longValue();
        }).collect(Collectors.toList());
        logDebugFileStatuses("deltasAfterCheckpoint", list5);
        List<FileStatus> list6 = (List) list3.stream().filter(fileStatus3 -> {
            Tuple2<Long, Long> logCompactionVersions = FileNames.logCompactionVersions(new Path(fileStatus3.getPath()));
            return longValue2 + 1 <= logCompactionVersions._1.longValue() && logCompactionVersions._2.longValue() <= ((Long) optional.orElse(Long.MAX_VALUE)).longValue();
        }).collect(Collectors.toList());
        logDebugFileStatuses("compactionsAfterCheckpoint", list6);
        List list7 = (List) list5.stream().map(fileStatus4 -> {
            return Long.valueOf(FileNames.deltaVersion(new Path(fileStatus4.getPath())));
        }).collect(Collectors.toList());
        long longValue3 = list7.isEmpty() ? longValue2 : ((Long) ListUtils.getLast(list7)).longValue();
        logger.info("New version to load: {}", Long.valueOf(longValue3));
        if (!latestCompleteCheckpointFromList.isPresent() && list5.isEmpty()) {
            throw new InvalidTableException(this.tablePath.toString(), "No complete checkpoint found and no delta files found");
        }
        if (latestCompleteCheckpointFromList.isPresent() && list.stream().map(fileStatus5 -> {
            return Long.valueOf(FileNames.deltaVersion(new Path(fileStatus5.getPath())));
        }).noneMatch(l2 -> {
            return l2.longValue() == longValue2;
        })) {
            throw new InvalidTableException(this.tablePath.toString(), String.format("Missing delta file for version %s", Long.valueOf(longValue2)));
        }
        optional.ifPresent(l3 -> {
            if (longValue3 < l3.longValue()) {
                throw DeltaErrors.versionToLoadAfterLatestCommit(this.tablePath.toString(), l3.longValue(), longValue3);
            }
            if (longValue3 > l3.longValue()) {
                throw new IllegalStateException(String.format("%s: Expected to load version %s but actually loaded version %s", this.tablePath, l3, Long.valueOf(longValue3)));
            }
        });
        if (!list5.isEmpty()) {
            verifyDeltaVersionsContiguous(list7, this.tablePath);
            if (!((Long) list7.get(0)).equals(Long.valueOf(longValue2 + 1))) {
                throw new InvalidTableException(this.tablePath.toString(), String.format("Cannot compute snapshot. Missing delta file version %d.", Long.valueOf(longValue2 + 1)));
            }
            logger.info("Verified delta files are contiguous from version {} to {}", Long.valueOf(longValue2 + 1), Long.valueOf(longValue3));
        }
        List list8 = (List) latestCompleteCheckpointFromList.map(checkpointInstance2 -> {
            HashSet hashSet2 = new HashSet(checkpointInstance2.getCorrespondingFiles(this.logPath));
            List<FileStatus> list9 = (List) list2.stream().filter(fileStatus6 -> {
                return hashSet2.contains(new Path(fileStatus6.getPath()));
            }).collect(Collectors.toList());
            logDebugFileStatuses("newCheckpointFileStatuses", list9);
            if (list9.size() != hashSet2.size()) {
                throw new IllegalStateException(String.format("Seems like the checkpoint is corrupted. Failed in getting the file information for:\n%s\namong\n%s", hashSet2.stream().map((v0) -> {
                    return v0.toString();
                }).collect(Collectors.joining("\n - ")), list2.stream().map((v0) -> {
                    return v0.getPath();
                }).collect(Collectors.joining("\n - "))));
            }
            return list9;
        }).orElse(Collections.emptyList());
        Optional empty = Optional.empty();
        if (!list4.isEmpty()) {
            FileStatus fileStatus6 = (FileStatus) ListUtils.getLast(list4);
            if (FileNames.checksumVersion(new Path(fileStatus6.getPath())) >= longValue2) {
                empty = Optional.of(fileStatus6);
            }
        }
        logger.info("Successfully constructed LogSegment at version {}", Long.valueOf(longValue3));
        return new LogSegment(this.logPath, longValue3, list5, list6, list8, empty, ((FileStatus) ListUtils.getLast(list)).getModificationTime());
    }

    private Optional<Long> getStartCheckpointVersion(Engine engine, Optional<Long> optional) {
        return (Optional) optional.map(l -> {
            logger.info("Finding last complete checkpoint at or before version {}", l);
            long currentTimeMillis = System.currentTimeMillis();
            return Checkpointer.findLastCompleteCheckpointBefore(engine, this.logPath, l.longValue() + 1).map(checkpointInstance -> {
                return Long.valueOf(checkpointInstance.version);
            }).map(l -> {
                Preconditions.checkArgument(l.longValue() <= l.longValue(), "Last complete checkpoint version %s was not <= targetVersion %s", l, l);
                logger.info("{}: Took {}ms to find last complete checkpoint <= targetVersion {}", new Object[]{this.tablePath, Long.valueOf(System.currentTimeMillis() - currentTimeMillis), l});
                return l;
            });
        }).orElseGet(() -> {
            return new Checkpointer(this.logPath).readLastCheckpointFile(engine).map(checkpointMetaData -> {
                return Long.valueOf(checkpointMetaData.version);
            });
        });
    }

    private void logDebugFileStatuses(String str, List<FileStatus> list) {
        if (logger.isDebugEnabled()) {
            logger.debug("{}: {}", str, Arrays.toString(list.stream().map(fileStatus -> {
                return new Path(fileStatus.getPath()).getName();
            }).toArray()));
        }
    }
}
