package io.activej.cube.service;

import io.activej.aggregation.ActiveFsChunkStorage;
import io.activej.async.function.AsyncRunnable;
import io.activej.async.function.AsyncRunnables;
import io.activej.async.util.LogUtils;
import io.activej.common.Utils;
import io.activej.common.function.BiConsumerEx;
import io.activej.common.function.FunctionEx;
import io.activej.common.initializer.WithInitializer;
import io.activej.cube.exception.CubeException;
import io.activej.cube.ot.CubeDiffScheme;
import io.activej.eventloop.Eventloop;
import io.activej.eventloop.jmx.EventloopJmxBeanWithStats;
import io.activej.jmx.api.attribute.JmxAttribute;
import io.activej.jmx.api.attribute.JmxOperation;
import io.activej.ot.OTAlgorithms;
import io.activej.ot.OTCommit;
import io.activej.ot.exception.GraphExhaustedException;
import io.activej.ot.reducers.DiffsReducer;
import io.activej.ot.repository.OTRepository;
import io.activej.ot.system.OTSystem;
import io.activej.promise.Promise;
import io.activej.promise.Promises;
import io.activej.promise.SettablePromise;
import io.activej.promise.jmx.PromiseStats;
import java.time.Duration;
import java.time.Instant;
import java.time.temporal.TemporalAmount;
import java.util.Collections;
import java.util.HashSet;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/activej/cube/service/CubeCleanerController.class */
public final class CubeCleanerController<K, D, C> implements EventloopJmxBeanWithStats, WithInitializer<CubeCleanerController<K, D, C>> {
    public static final int DEFAULT_SNAPSHOTS_COUNT = 1;
    private final Eventloop eventloop;
    private final OTSystem<D> otSystem;
    private final OTRepository<K, D> repository;
    private final ActiveFsChunkStorage<C> chunksStorage;
    private final CubeDiffScheme<D> cubeDiffScheme;
    private Duration freezeTimeout;
    private Duration chunksCleanupDelay = DEFAULT_CHUNKS_CLEANUP_DELAY;
    private int extraSnapshotsCount = 1;
    private final PromiseStats promiseCleanup = PromiseStats.create(DEFAULT_SMOOTHING_WINDOW);
    private final PromiseStats promiseCleanupCollectRequiredChunks = PromiseStats.create(DEFAULT_SMOOTHING_WINDOW);
    private final PromiseStats promiseCleanupRepository = PromiseStats.create(DEFAULT_SMOOTHING_WINDOW);
    private final PromiseStats promiseCleanupChunks = PromiseStats.create(DEFAULT_SMOOTHING_WINDOW);
    private final AsyncRunnable cleanup = AsyncRunnables.reuse(this::doCleanup);
    private static final Logger logger = LoggerFactory.getLogger(CubeCleanerController.class);
    public static final Duration DEFAULT_CHUNKS_CLEANUP_DELAY = Duration.ofMinutes(1);
    public static final Duration DEFAULT_SMOOTHING_WINDOW = Duration.ofMinutes(5);

    /* loaded from: input_file:io/activej/cube/service/CubeCleanerController$Tuple.class */
    private static class Tuple<K, D, C> {
        final Set<C> collectedChunks;
        final OTCommit<K, D> lastSnapshot;

        Tuple(Set<C> set, OTCommit<K, D> oTCommit) {
            this.collectedChunks = set;
            this.lastSnapshot = oTCommit;
        }
    }

    CubeCleanerController(Eventloop eventloop, CubeDiffScheme<D> cubeDiffScheme, OTRepository<K, D> oTRepository, OTSystem<D> oTSystem, ActiveFsChunkStorage<C> activeFsChunkStorage) {
        this.eventloop = eventloop;
        this.cubeDiffScheme = cubeDiffScheme;
        this.otSystem = oTSystem;
        this.repository = oTRepository;
        this.chunksStorage = activeFsChunkStorage;
    }

    public static <K, D, C> CubeCleanerController<K, D, C> create(Eventloop eventloop, CubeDiffScheme<D> cubeDiffScheme, OTRepository<K, D> oTRepository, OTSystem<D> oTSystem, ActiveFsChunkStorage<C> activeFsChunkStorage) {
        return new CubeCleanerController<>(eventloop, cubeDiffScheme, oTRepository, oTSystem, activeFsChunkStorage);
    }

    public CubeCleanerController<K, D, C> withChunksCleanupDelay(Duration duration) {
        this.chunksCleanupDelay = duration;
        return this;
    }

    public CubeCleanerController<K, D, C> withExtraSnapshotsCount(int i) {
        this.extraSnapshotsCount = i;
        return this;
    }

    public CubeCleanerController<K, D, C> withFreezeTimeout(Duration duration) {
        this.freezeTimeout = duration;
        return this;
    }

    public Promise<Void> cleanup() {
        return this.cleanup.run();
    }

    private Promise<Void> doCleanup() {
        return this.repository.getHeads().then(set -> {
            return OTAlgorithms.excludeParents(this.repository, this.otSystem, set);
        }).mapException(exc -> {
            return !(exc instanceof GraphExhaustedException);
        }, exc2 -> {
            return new CubeException("Failed to get heads", exc2);
        }).then(set2 -> {
            return findFrozenCut(set2, this.eventloop.currentInstant().minus((TemporalAmount) this.freezeTimeout));
        }).then(this::cleanupFrozenCut).then((r3, exc3) -> {
            return exc3 instanceof GraphExhaustedException ? Promise.of((Object) null) : Promise.of(r3, exc3);
        }).whenComplete(this.promiseCleanup.recordStats()).whenComplete(LogUtils.toLogger(logger, LogUtils.thisMethod(), new Object[0]));
    }

    private Promise<Set<K>> findFrozenCut(Set<K> set, Instant instant) {
        return OTAlgorithms.findCut(this.repository, this.otSystem, set, collection -> {
            return collection.stream().allMatch(oTCommit -> {
                return oTCommit.getInstant().compareTo(instant) < 0;
            });
        }).mapException(exc -> {
            return !(exc instanceof GraphExhaustedException);
        }, exc2 -> {
            return new CubeException("Failed to find frozen cut, freeze timestamp: " + instant, exc2);
        }).whenComplete(LogUtils.toLogger(logger, LogUtils.thisMethod(), new Object[]{set, instant}));
    }

    private Promise<Void> cleanupFrozenCut(Set<K> set) {
        return OTAlgorithms.findAllCommonParents(this.repository, this.otSystem, set).then(set2 -> {
            return OTAlgorithms.findAnyCommonParent(this.repository, this.otSystem, set2);
        }).then(obj -> {
            return this.repository.hasSnapshot(obj).then(bool -> {
                if (!bool.booleanValue()) {
                    return trySaveSnapshotAndCleanupChunks(obj);
                }
                logger.info("Snapshot already exists, skip cleanup");
                return Promise.complete();
            });
        }).mapException(exc -> {
            return !(exc instanceof GraphExhaustedException);
        }, exc2 -> {
            return new CubeException("Failed to cleanup frozen cut: " + Utils.toString(set), exc2);
        }).whenComplete(LogUtils.toLogger(logger, LogUtils.thisMethod(), new Object[]{set}));
    }

    private Promise<Void> trySaveSnapshotAndCleanupChunks(K k) {
        return OTAlgorithms.checkout(this.repository, this.otSystem, k).then(list -> {
            return this.repository.saveSnapshot(k, list).then(() -> {
                return findSnapshot(Collections.singleton(k), this.extraSnapshotsCount);
            }).then(optional -> {
                if (optional.isPresent()) {
                    return Promises.toTuple(Tuple::new, collectRequiredChunks(k), this.repository.loadCommit(optional.get())).then(tuple -> {
                        return cleanup(optional.get(), Utils.union(io.activej.cube.Utils.chunksInDiffs(this.cubeDiffScheme, list), tuple.collectedChunks), tuple.lastSnapshot.getInstant().minus((TemporalAmount) this.chunksCleanupDelay));
                    });
                }
                logger.info("Not enough snapshots, skip cleanup");
                return Promise.complete();
            });
        }).whenComplete(LogUtils.toLogger(logger, LogUtils.thisMethod(), new Object[]{k}));
    }

    private Promise<Optional<K>> findSnapshot(Set<K> set, int i) {
        return Promise.ofCallback(settablePromise -> {
            findSnapshotImpl(set, i, settablePromise);
        });
    }

    private void findSnapshotImpl(Set<K> set, int i, SettablePromise<Optional<K>> settablePromise) {
        Promise whenResult = OTAlgorithms.findParent(this.repository, this.otSystem, set, DiffsReducer.toVoid(), oTCommit -> {
            return this.repository.hasSnapshot(oTCommit.getId());
        }).async().whenResult(findResult -> {
            if (i <= 0) {
                settablePromise.set(Optional.of(findResult.getCommit()));
            } else if (findResult.getCommitParents().isEmpty()) {
                settablePromise.set(Optional.empty());
            } else {
                findSnapshotImpl(findResult.getCommitParents(), i - 1, settablePromise);
            }
        });
        Objects.requireNonNull(settablePromise);
        whenResult.whenException(settablePromise::setException);
    }

    private Promise<Set<C>> collectRequiredChunks(K k) {
        return this.repository.getHeads().then(set -> {
            return OTAlgorithms.reduceEdges(this.repository, this.otSystem, set, k, DiffsReducer.of(new HashSet(), (set, list) -> {
                return Utils.union(set, io.activej.cube.Utils.chunksInDiffs(this.cubeDiffScheme, list));
            }, Utils::union)).whenComplete(this.promiseCleanupCollectRequiredChunks.recordStats());
        }).map(map -> {
            return (Set) map.values().stream().flatMap((v0) -> {
                return v0.stream();
            }).collect(Collectors.toSet());
        }).whenComplete(transform((v0) -> {
            return v0.size();
        }, LogUtils.toLogger(logger, LogUtils.thisMethod(), new Object[]{k})));
    }

    private Promise<Void> cleanup(K k, Set<C> set, Instant instant) {
        return this.chunksStorage.checkRequiredChunks(set).then(() -> {
            return this.repository.cleanup(k).whenComplete(this.promiseCleanupRepository.recordStats());
        }).then(() -> {
            return this.chunksStorage.cleanup(set, instant).whenComplete(this.promiseCleanupChunks.recordStats());
        }).whenComplete(logger.isTraceEnabled() ? LogUtils.toLogger(logger, LogUtils.Level.TRACE, LogUtils.thisMethod(), new Object[]{k, instant, set}) : LogUtils.toLogger(logger, LogUtils.thisMethod(), new Object[]{k, instant, Utils.toString(set)}));
    }

    @JmxAttribute
    public Duration getChunksCleanupDelay() {
        return this.chunksCleanupDelay;
    }

    @JmxAttribute
    public void setChunksCleanupDelay(Duration duration) {
        this.chunksCleanupDelay = duration;
    }

    @JmxAttribute
    public int getExtraSnapshotsCount() {
        return this.extraSnapshotsCount;
    }

    @JmxAttribute
    public void setExtraSnapshotsCount(int i) {
        this.extraSnapshotsCount = i;
    }

    @JmxAttribute
    public Duration getFreezeTimeout() {
        return this.freezeTimeout;
    }

    @JmxAttribute
    public void setFreezeTimeout(Duration duration) {
        this.freezeTimeout = duration;
    }

    @JmxAttribute
    public PromiseStats getPromiseCleanup() {
        return this.promiseCleanup;
    }

    @JmxAttribute
    public PromiseStats getPromiseCleanupCollectRequiredChunks() {
        return this.promiseCleanupCollectRequiredChunks;
    }

    @JmxAttribute
    public PromiseStats getPromiseCleanupRepository() {
        return this.promiseCleanupRepository;
    }

    @JmxAttribute
    public PromiseStats getPromiseCleanupChunks() {
        return this.promiseCleanupChunks;
    }

    @JmxOperation
    public void cleanupNow() {
        cleanup();
    }

    @NotNull
    public Eventloop getEventloop() {
        return this.eventloop;
    }

    private static <T, R> BiConsumerEx<R, Exception> transform(FunctionEx<? super R, ? extends T> functionEx, BiConsumerEx<? super T, Exception> biConsumerEx) {
        return (obj, exc) -> {
            biConsumerEx.accept(obj != null ? functionEx.apply(obj) : null, exc);
        };
    }
}
