package io.datakernel.cube.service;

import io.datakernel.aggregation.AggregationChunk;
import io.datakernel.aggregation.AggregationChunkStorage;
import io.datakernel.async.AsyncCallable;
import io.datakernel.async.AsyncPredicate;
import io.datakernel.async.Stage;
import io.datakernel.async.Stages;
import io.datakernel.cube.Cube;
import io.datakernel.cube.ot.CubeDiff;
import io.datakernel.eventloop.Eventloop;
import io.datakernel.jmx.EventloopJmxMBeanEx;
import io.datakernel.jmx.JmxAttribute;
import io.datakernel.jmx.JmxOperation;
import io.datakernel.jmx.StageStats;
import io.datakernel.jmx.ValueStats;
import io.datakernel.logfs.ot.LogDiff;
import io.datakernel.logfs.ot.LogOTProcessor;
import io.datakernel.logfs.ot.LogOTState;
import io.datakernel.ot.OTStateManager;
import io.datakernel.ot.OTSystem;
import io.datakernel.util.LogUtils;
import java.time.Duration;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/datakernel/cube/service/CubeLogProcessorController.class */
public final class CubeLogProcessorController implements EventloopJmxMBeanEx {
    private static final Logger logger = LoggerFactory.getLogger(CubeLogProcessorController.class);
    public static final Duration DEFAULT_SMOOTHING_WINDOW = Duration.ofMinutes(5);
    private final Eventloop eventloop;
    private final List<LogOTProcessor<?, CubeDiff>> logProcessors;
    private final OTSystem<LogDiff<CubeDiff>> otSystem;
    private final AggregationChunkStorage chunkStorage;
    private final OTStateManager<Integer, LogDiff<CubeDiff>> stateManager;
    private final AsyncPredicate<Integer> predicate;
    private boolean parallelRunner;
    private StageStats stageProcessLogs = StageStats.create(DEFAULT_SMOOTHING_WINDOW);
    private StageStats stageProcessLogsImpl = StageStats.create(DEFAULT_SMOOTHING_WINDOW);
    private ValueStats addedChunks = ValueStats.create(DEFAULT_SMOOTHING_WINDOW);
    private ValueStats addedChunksRecords = ValueStats.create(DEFAULT_SMOOTHING_WINDOW);
    private final AsyncCallable<Boolean> processLogs = AsyncCallable.sharedCall(this::doProcessLogs);

    CubeLogProcessorController(Eventloop eventloop, List<LogOTProcessor<?, CubeDiff>> list, OTSystem<LogDiff<CubeDiff>> oTSystem, AggregationChunkStorage aggregationChunkStorage, OTStateManager<Integer, LogDiff<CubeDiff>> oTStateManager, AsyncPredicate<Integer> asyncPredicate) {
        this.eventloop = eventloop;
        this.logProcessors = list;
        this.otSystem = oTSystem;
        this.chunkStorage = aggregationChunkStorage;
        this.stateManager = oTStateManager;
        this.predicate = asyncPredicate;
    }

    public static CubeLogProcessorController create(Eventloop eventloop, OTStateManager<Integer, LogDiff<CubeDiff>> oTStateManager, AggregationChunkStorage aggregationChunkStorage, List<LogOTProcessor<?, CubeDiff>> list) {
        OTSystem otSystem = oTStateManager.getAlgorithms().getOtSystem();
        Cube cube = (Cube) ((LogOTState) oTStateManager.getState()).getDataState();
        return new CubeLogProcessorController(eventloop, list, otSystem, aggregationChunkStorage, oTStateManager, AsyncPredicate.of(num -> {
            if (!cube.containsExcessiveNumberOfOverlappingChunks()) {
                return true;
            }
            logger.info("Cube contains excessive number of overlapping chunks");
            return false;
        }));
    }

    public CubeLogProcessorController withParallelRunner(boolean z) {
        this.parallelRunner = z;
        return this;
    }

    public Stage<Boolean> processLogs() {
        return this.processLogs.call();
    }

    Stage<Boolean> doProcessLogs() {
        return process().whenComplete(this.stageProcessLogs.recordStats()).whenComplete(LogUtils.toLogger(logger, LogUtils.thisMethod(), new Object[]{this.stateManager}));
    }

    Stage<Boolean> process() {
        Stage pull = this.stateManager.pull();
        AsyncPredicate<Integer> asyncPredicate = this.predicate;
        asyncPredicate.getClass();
        return pull.thenCompose((v1) -> {
            return r1.test(v1);
        }).thenCompose(bool -> {
            if (!bool.booleanValue()) {
                return Stage.of(false);
            }
            logger.info("Pull to commit: {}, start log processing", this.stateManager.getRevision());
            List list = (List) this.logProcessors.stream().map(logOTProcessor -> {
                logOTProcessor.getClass();
                return AsyncCallable.of(logOTProcessor::processLog);
            }).collect(Collectors.toList());
            return (this.parallelRunner ? Stages.collectToList(list.stream().map((v0) -> {
                return v0.call();
            })) : Stages.collectSequence(list, Collectors.toList())).whenComplete(this.stageProcessLogsImpl.recordStats()).whenResult(this::cubeDiffJmx).thenCompose(list2 -> {
                this.stateManager.add(this.otSystem.squash(list2));
                Stage thenCompose = this.stateManager.pull().thenCompose(num -> {
                    return this.stateManager.getAlgorithms().mergeHeadsAndPush();
                });
                OTStateManager<Integer, LogDiff<CubeDiff>> oTStateManager = this.stateManager;
                oTStateManager.getClass();
                return thenCompose.thenCompose((v1) -> {
                    return r1.pull(v1);
                }).thenCompose(bool -> {
                    return this.stateManager.pull();
                }).thenCompose(num2 -> {
                    return this.stateManager.commit();
                }).thenCompose(num3 -> {
                    return this.chunkStorage.finish(addedChunks(list2));
                }).thenCompose(r3 -> {
                    return this.stateManager.push();
                }).thenApply(r2 -> {
                    return true;
                });
            });
        }).whenComplete(LogUtils.toLogger(logger, LogUtils.thisMethod(), new Object[]{this.stateManager}));
    }

    private void cubeDiffJmx(List<LogDiff<CubeDiff>> list) {
        long j = 0;
        long j2 = 0;
        Iterator<LogDiff<CubeDiff>> it = list.iterator();
        while (it.hasNext()) {
            for (CubeDiff cubeDiff : it.next().getDiffs()) {
                Iterator<String> it2 = cubeDiff.keySet().iterator();
                while (it2.hasNext()) {
                    j += r0.getAddedChunks().size();
                    while (cubeDiff.get(it2.next()).getAddedChunks().iterator().hasNext()) {
                        j2 += ((AggregationChunk) r0.next()).getCount();
                    }
                }
            }
        }
        this.addedChunks.recordValue(j);
        this.addedChunksRecords.recordValue(j2);
    }

    private Set<Long> addedChunks(List<LogDiff<CubeDiff>> list) {
        return (Set) list.stream().flatMap((v0) -> {
            return v0.diffs();
        }).flatMap((v0) -> {
            return v0.addedChunks();
        }).collect(Collectors.toSet());
    }

    public Eventloop getEventloop() {
        return this.eventloop;
    }

    @JmxAttribute
    public ValueStats getLastAddedChunks() {
        return this.addedChunks;
    }

    @JmxAttribute
    public ValueStats getLastAddedChunksRecords() {
        return this.addedChunksRecords;
    }

    @JmxAttribute
    public StageStats getStageProcessLogs() {
        return this.stageProcessLogs;
    }

    @JmxAttribute
    public StageStats getStageProcessLogsImpl() {
        return this.stageProcessLogsImpl;
    }

    @JmxAttribute
    public boolean isParallelRunner() {
        return this.parallelRunner;
    }

    @JmxAttribute
    public void setParallelRunner(boolean z) {
        this.parallelRunner = z;
    }

    @JmxOperation
    public void processLogsNow() {
        processLogs();
    }
}
