package io.datakernel.cube.service;

import io.datakernel.aggregation.LocalFsChunkStorage;
import io.datakernel.async.AsyncCallable;
import io.datakernel.async.Stage;
import io.datakernel.cube.ot.CubeDiff;
import io.datakernel.eventloop.Eventloop;
import io.datakernel.jmx.EventloopJmxMBeanEx;
import io.datakernel.jmx.JmxAttribute;
import io.datakernel.jmx.JmxOperation;
import io.datakernel.jmx.StageStats;
import io.datakernel.jmx.ValueStats;
import io.datakernel.logfs.ot.LogDiff;
import io.datakernel.ot.DiffsReducer;
import io.datakernel.ot.OTAlgorithms;
import io.datakernel.ot.OTCommit;
import io.datakernel.ot.OTRemote;
import io.datakernel.ot.OTRemoteSql;
import io.datakernel.util.CollectionUtils;
import io.datakernel.util.LogUtils;
import java.time.Duration;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/datakernel/cube/service/CubeCleanerController.class */
public final class CubeCleanerController implements EventloopJmxMBeanEx {
    public static final int DEFAULT_SNAPSHOTS_COUNT = 1;
    private final Eventloop eventloop;
    private final OTAlgorithms<Integer, LogDiff<CubeDiff>> algorithms;
    private final OTRemote<Integer, LogDiff<CubeDiff>> remote;
    private final LocalFsChunkStorage chunksStorage;
    private Duration freezeTimeout;
    public static final Duration DEFAULT_CHUNKS_CLEANUP_DELAY = Duration.ofMinutes(1);
    public static final Duration DEFAULT_SMOOTHING_WINDOW = Duration.ofMinutes(5);
    private final Logger logger = LoggerFactory.getLogger(CubeCleanerController.class);
    private Duration chunksCleanupDelay = DEFAULT_CHUNKS_CLEANUP_DELAY;
    private int extraSnapshotsCount = 1;
    private final ValueStats chunksCount = ValueStats.create(DEFAULT_SMOOTHING_WINDOW);
    private final StageStats stageCleanup = StageStats.create(DEFAULT_SMOOTHING_WINDOW);
    private final StageStats stageCleanupCollectRequiredChunks = StageStats.create(DEFAULT_SMOOTHING_WINDOW);
    private final StageStats stageCleanupCheckRequiredChunks = StageStats.create(DEFAULT_SMOOTHING_WINDOW);
    private final StageStats stageCleanupRemote = StageStats.create(DEFAULT_SMOOTHING_WINDOW);
    private final StageStats stageCleanupChunks = StageStats.create(DEFAULT_SMOOTHING_WINDOW);
    private final AsyncCallable<Void> cleanup = AsyncCallable.sharedCall(this::doCleanup);

    CubeCleanerController(Eventloop eventloop, OTAlgorithms<Integer, LogDiff<CubeDiff>> oTAlgorithms, OTRemoteSql<LogDiff<CubeDiff>> oTRemoteSql, LocalFsChunkStorage localFsChunkStorage) {
        this.eventloop = eventloop;
        this.algorithms = oTAlgorithms;
        this.remote = oTRemoteSql;
        this.chunksStorage = localFsChunkStorage;
    }

    public static CubeCleanerController create(Eventloop eventloop, OTAlgorithms<Integer, LogDiff<CubeDiff>> oTAlgorithms, LocalFsChunkStorage localFsChunkStorage) {
        return new CubeCleanerController(eventloop, oTAlgorithms, oTAlgorithms.getRemote(), localFsChunkStorage);
    }

    public CubeCleanerController withChunksCleanupDelay(Duration duration) {
        this.chunksCleanupDelay = duration;
        return this;
    }

    public CubeCleanerController withExtraSnapshotCount(int i) {
        this.extraSnapshotsCount = i;
        return this;
    }

    public CubeCleanerController withFreezeTimeout(Duration duration) {
        this.freezeTimeout = duration;
        return this;
    }

    private static Set<Long> chunks(List<LogDiff<CubeDiff>> list) {
        return (Set) list.stream().flatMap((v0) -> {
            return v0.diffs();
        }).flatMap(cubeDiff -> {
            Stream<String> stream = cubeDiff.keySet().stream();
            cubeDiff.getClass();
            return stream.map(cubeDiff::get);
        }).flatMap(aggregationDiff -> {
            return Stream.concat(aggregationDiff.getAddedChunks().stream(), aggregationDiff.getRemovedChunks().stream());
        }).map((v0) -> {
            return v0.getChunkId();
        }).collect(Collectors.toSet());
    }

    private static Stream<LogDiff<CubeDiff>> commitToDiffs(OTCommit<Integer, LogDiff<CubeDiff>> oTCommit) {
        return oTCommit.getParents().values().stream().flatMap((v0) -> {
            return v0.stream();
        });
    }

    public Stage<Void> cleanup() {
        return this.cleanup.call();
    }

    Stage<Void> doCleanup() {
        return this.remote.getHeads().thenCompose(set -> {
            return findFrozenCut(set, this.eventloop.currentTimeMillis() - this.freezeTimeout.toMillis());
        }).thenCompose(this::cleanupFrozenCut).whenComplete(this.stageCleanup.recordStats()).whenComplete(LogUtils.toLogger(this.logger, LogUtils.thisMethod(), new Object[0]));
    }

    Stage<Set<Integer>> findFrozenCut(Set<Integer> set, long j) {
        return this.algorithms.findCut(set, set2 -> {
            return set2.stream().allMatch(oTCommit -> {
                return oTCommit.getTimestamp() < j;
            });
        }).whenComplete(LogUtils.toLogger(this.logger, LogUtils.thisMethod(), new Object[]{set, Long.valueOf(j)}));
    }

    Stage<Void> cleanupFrozenCut(Set<Integer> set) {
        this.logger.info("Frozen cut: {}", set);
        return findBottomNodes(set).thenCompose(optional -> {
            return optional.isPresent() ? this.algorithms.findFirstCommonParent((Set) optional.get()).thenCompose(optional -> {
                return optional.isPresent() ? trySaveSnapshotAndCleanupChunks((Integer) optional.get()) : Stage.of((Object) null);
            }) : Stage.of((Object) null);
        }).whenComplete(LogUtils.toLogger(this.logger, LogUtils.thisMethod(), new Object[]{set}));
    }

    Stage<Optional<Set<Integer>>> findBottomNodes(Set<Integer> set) {
        return this.algorithms.findCommonParents(set).thenApply(set2 -> {
            return set2.isEmpty() ? Optional.empty() : Optional.of(set2);
        }).whenComplete(LogUtils.toLogger(this.logger, LogUtils.thisMethod(), new Object[]{set}));
    }

    Stage<Void> trySaveSnapshotAndCleanupChunks(Integer num) {
        return this.algorithms.checkout(num).thenCompose(list -> {
            return this.remote.saveSnapshot(num, list).thenCompose(r6 -> {
                return findSnapshot(Collections.singleton(num), this.extraSnapshotsCount);
            }).thenCompose(optional -> {
                return optional.isPresent() ? collectRequiredChunks((Integer) optional.get()).thenApply(set -> {
                    return CollectionUtils.union(chunks(list), set);
                }).thenCompose(set2 -> {
                    return this.remote.loadCommit(optional.get()).thenApply(oTCommit -> {
                        return Long.valueOf(oTCommit.getTimestamp() - this.chunksCleanupDelay.toMillis());
                    }).thenCompose(l -> {
                        return cleanup((Integer) optional.get(), set2, l.longValue());
                    });
                }) : notEnoughSnapshots();
            });
        }).whenComplete(LogUtils.toLogger(this.logger, LogUtils.thisMethod(), new Object[]{num}));
    }

    Stage<Optional<Integer>> findSnapshot(Set<Integer> set, int i) {
        return this.algorithms.findParent(set, DiffsReducer.toVoid(), (v0) -> {
            return v0.isSnapshot();
        }, (Object) null).post().thenCompose(findResult -> {
            return !findResult.isFound() ? Stage.of(Optional.empty()) : i <= 0 ? Stage.of(Optional.of(findResult.getCommit())) : findSnapshot(findResult.getCommitParents(), i - 1);
        });
    }

    private Stage<Void> notEnoughSnapshots() {
        this.logger.info("Not enough snapshots, skip cleanup");
        return Stage.of((Object) null);
    }

    private static <T, R> BiConsumer<R, Throwable> transform(Function<? super R, ? extends T> function, BiConsumer<? super T, Throwable> biConsumer) {
        return (obj, th) -> {
            biConsumer.accept(obj != null ? function.apply(obj) : null, th);
        };
    }

    private Stage<Set<Long>> collectRequiredChunks(Integer num) {
        return this.remote.getHeads().thenCompose(set -> {
            return this.algorithms.reduceEdges(set, num, DiffsReducer.of(new HashSet(), (set, list) -> {
                return CollectionUtils.union(set, chunks(list));
            }, CollectionUtils::union)).whenComplete(this.stageCleanupCollectRequiredChunks.recordStats());
        }).thenApply(map -> {
            return (Set) map.values().stream().flatMap((v0) -> {
                return v0.stream();
            }).collect(Collectors.toSet());
        }).whenComplete(transform((v0) -> {
            return v0.size();
        }, LogUtils.toLogger(this.logger, LogUtils.thisMethod(), new Object[]{num})));
    }

    private Stage<Void> cleanup(Integer num, Set<Long> set, long j) {
        return checkRequiredChunks(set).thenCompose(r5 -> {
            return this.remote.cleanup(num).whenComplete(this.stageCleanupRemote.recordStats());
        }).thenCompose(r9 -> {
            return this.chunksStorage.cleanupBeforeTimestamp(set, j).whenComplete(this.stageCleanupChunks.recordStats());
        }).whenComplete(this.logger.isTraceEnabled() ? LogUtils.toLogger(this.logger, LogUtils.Level.TRACE, LogUtils.thisMethod(), new Object[]{num, Long.valueOf(j), set}) : LogUtils.toLogger(this.logger, LogUtils.thisMethod(), new Object[]{num, Long.valueOf(j), CollectionUtils.toLimitedString(set, 6)}));
    }

    private Stage<Void> checkRequiredChunks(Set<Long> set) {
        return this.chunksStorage.list(str -> {
            return true;
        }, l -> {
            return true;
        }).whenResult(set2 -> {
            this.chunksCount.recordValue(set2.size());
        }).thenCompose(set3 -> {
            return set3.containsAll(set) ? Stage.of((Void) null) : Stage.ofException(new IllegalStateException("Missed chunks from storage: " + CollectionUtils.toLimitedString(CollectionUtils.difference(set, set3), 100)));
        }).whenComplete(this.stageCleanupCheckRequiredChunks.recordStats()).whenComplete(LogUtils.toLogger(this.logger, LogUtils.thisMethod(), new Object[]{CollectionUtils.toLimitedString(set, 6)}));
    }

    @JmxAttribute
    public Duration getChunksCleanupDelay() {
        return this.chunksCleanupDelay;
    }

    @JmxAttribute
    public void setChunksCleanupDelay(Duration duration) {
        this.chunksCleanupDelay = duration;
    }

    @JmxAttribute
    public int getExtraSnapshotsCount() {
        return this.extraSnapshotsCount;
    }

    @JmxAttribute
    public void setExtraSnapshotsCount(int i) {
        this.extraSnapshotsCount = i;
    }

    @JmxAttribute
    public Duration getFreezeTimeout() {
        return this.freezeTimeout;
    }

    @JmxAttribute
    public void setFreezeTimeout(Duration duration) {
        this.freezeTimeout = duration;
    }

    @JmxAttribute
    public ValueStats getChunksCount() {
        return this.chunksCount;
    }

    @JmxAttribute
    public StageStats getStageCleanup() {
        return this.stageCleanup;
    }

    @JmxAttribute
    public StageStats getStageCleanupCollectRequiredChunks() {
        return this.stageCleanupCollectRequiredChunks;
    }

    @JmxAttribute
    public StageStats getStageCleanupCheckRequiredChunks() {
        return this.stageCleanupCheckRequiredChunks;
    }

    @JmxAttribute
    public StageStats getStageCleanupRemote() {
        return this.stageCleanupRemote;
    }

    @JmxAttribute
    public StageStats getStageCleanupChunks() {
        return this.stageCleanupChunks;
    }

    @JmxOperation
    public void cleanupNow() {
        cleanup();
    }

    public Eventloop getEventloop() {
        return this.eventloop;
    }
}
