package io.activej.cube.service;

import io.activej.async.function.AsyncSupplier;
import io.activej.async.function.AsyncSuppliers;
import io.activej.async.util.LogUtils;
import io.activej.common.builder.AbstractBuilder;
import io.activej.cube.CubeState;
import io.activej.cube.aggregation.IAggregationChunkStorage;
import io.activej.cube.aggregation.ot.ProtoAggregationDiff;
import io.activej.cube.aggregation.util.Utils;
import io.activej.cube.exception.CubeException;
import io.activej.cube.ot.CubeDiff;
import io.activej.cube.ot.ProtoCubeDiff;
import io.activej.etl.LogDiff;
import io.activej.etl.LogProcessor;
import io.activej.etl.LogState;
import io.activej.jmx.api.attribute.JmxAttribute;
import io.activej.jmx.api.attribute.JmxOperation;
import io.activej.jmx.stats.ValueStats;
import io.activej.ot.StateManager;
import io.activej.promise.Promise;
import io.activej.promise.Promises;
import io.activej.promise.jmx.PromiseStats;
import io.activej.reactor.AbstractReactive;
import io.activej.reactor.Reactive;
import io.activej.reactor.Reactor;
import io.activej.reactor.jmx.ReactiveJmxBeanWithStats;
import java.time.Duration;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/activej/cube/service/CubeLogProcessorController.class */
public final class CubeLogProcessorController extends AbstractReactive implements ReactiveJmxBeanWithStats {
    public static final int DEFAULT_OVERLAPPING_CHUNKS_THRESHOLD = 300;
    private static final Logger logger = LoggerFactory.getLogger(CubeLogProcessorController.class);
    public static final Duration DEFAULT_SMOOTHING_WINDOW = Duration.ofMinutes(5);
    private final List<LogProcessor<?, ProtoCubeDiff, CubeDiff>> logProcessors;
    private final IAggregationChunkStorage chunkStorage;
    private final StateManager<LogDiff<CubeDiff>, LogState<CubeDiff, CubeState>> stateManager;
    private AsyncSupplier<Boolean> predicate;
    private boolean parallelRunner;
    private final PromiseStats promiseProcessLogs;
    private final PromiseStats promiseProcessLogsImpl;
    private final ValueStats addedChunks;
    private final ValueStats addedChunksRecords;
    private int maxOverlappingChunksToProcessLogs;
    private final AsyncSupplier<Boolean> processLogs;

    /* loaded from: input_file:io/activej/cube/service/CubeLogProcessorController$Builder.class */
    public final class Builder extends AbstractBuilder<Builder, CubeLogProcessorController> {
        private Builder() {
        }

        public Builder withParallelRunner(boolean z) {
            checkNotBuilt(this);
            CubeLogProcessorController.this.parallelRunner = z;
            return this;
        }

        public Builder withMaxOverlappingChunksToProcessLogs(int i) {
            checkNotBuilt(this);
            CubeLogProcessorController.this.maxOverlappingChunksToProcessLogs = i;
            return this;
        }

        /* JADX INFO: Access modifiers changed from: protected */
        /* renamed from: doBuild, reason: merged with bridge method [inline-methods] */
        public CubeLogProcessorController m67doBuild() {
            CubeLogProcessorController.this.predicate = AsyncSupplier.of(() -> {
                return (Boolean) CubeLogProcessorController.this.stateManager.query(logState -> {
                    if (!((CubeState) logState.getDataState()).containsExcessiveNumberOfOverlappingChunks(CubeLogProcessorController.this.maxOverlappingChunksToProcessLogs)) {
                        return true;
                    }
                    CubeLogProcessorController.logger.info("Cube contains excessive number of overlapping chunks");
                    return false;
                });
            });
            return CubeLogProcessorController.this;
        }
    }

    private CubeLogProcessorController(Reactor reactor, List<LogProcessor<?, ProtoCubeDiff, CubeDiff>> list, IAggregationChunkStorage iAggregationChunkStorage, StateManager<LogDiff<CubeDiff>, LogState<CubeDiff, CubeState>> stateManager) {
        super(reactor);
        this.promiseProcessLogs = PromiseStats.create(DEFAULT_SMOOTHING_WINDOW);
        this.promiseProcessLogsImpl = PromiseStats.create(DEFAULT_SMOOTHING_WINDOW);
        this.addedChunks = ValueStats.create(DEFAULT_SMOOTHING_WINDOW);
        this.addedChunksRecords = (ValueStats) ValueStats.builder(DEFAULT_SMOOTHING_WINDOW).withRate().build();
        this.maxOverlappingChunksToProcessLogs = DEFAULT_OVERLAPPING_CHUNKS_THRESHOLD;
        this.processLogs = AsyncSuppliers.coalesce(this::doProcessLogs);
        this.logProcessors = list;
        this.chunkStorage = iAggregationChunkStorage;
        this.stateManager = stateManager;
    }

    public static CubeLogProcessorController create(Reactor reactor, StateManager<LogDiff<CubeDiff>, LogState<CubeDiff, CubeState>> stateManager, IAggregationChunkStorage iAggregationChunkStorage, List<LogProcessor<?, ProtoCubeDiff, CubeDiff>> list) {
        return (CubeLogProcessorController) builder(reactor, stateManager, iAggregationChunkStorage, list).build();
    }

    public static Builder builder(Reactor reactor, StateManager<LogDiff<CubeDiff>, LogState<CubeDiff, CubeState>> stateManager, IAggregationChunkStorage iAggregationChunkStorage, List<LogProcessor<?, ProtoCubeDiff, CubeDiff>> list) {
        return new Builder();
    }

    public Promise<Boolean> processLogs() {
        Reactive.checkInReactorThread(this);
        return this.processLogs.get();
    }

    Promise<Boolean> doProcessLogs() {
        Reactive.checkInReactorThread(this);
        return process().whenComplete(this.promiseProcessLogs.recordStats()).whenComplete(LogUtils.toLogger(logger, LogUtils.thisMethod(), new Object[]{this.stateManager}));
    }

    Promise<Boolean> process() {
        Reactive.checkInReactorThread(this);
        Promise complete = Promise.complete();
        StateManager<LogDiff<CubeDiff>, LogState<CubeDiff, CubeState>> stateManager = this.stateManager;
        Objects.requireNonNull(stateManager);
        return complete.then(stateManager::catchUp).mapException(exc -> {
            return new CubeException("Failed to synchronize state prior to log processing", exc);
        }).then(() -> {
            return this.predicate.get().mapException(exc2 -> {
                return new CubeException("Failed to test cube with predicate", exc2);
            });
        }).then(bool -> {
            if (!bool.booleanValue()) {
                return Promise.of(false);
            }
            logger.info("Start log processing");
            List list = (List) this.logProcessors.stream().map(logProcessor -> {
                Objects.requireNonNull(logProcessor);
                return logProcessor::processLog;
            }).collect(Collectors.toList());
            return (this.parallelRunner ? Promises.toList(list.stream().map((v0) -> {
                return v0.get();
            })) : Promises.reduce(Collectors.toList(), 1, Promises.asPromises(list))).mapException(exc2 -> {
                return new CubeException("Failed to process logs", exc2);
            }).whenComplete(this.promiseProcessLogsImpl.recordStats()).whenResult(this::cubeDiffJmx).then(list2 -> {
                return this.chunkStorage.finish(addedProtoChunks(list2)).mapException(exc3 -> {
                    return new CubeException("Failed to finalize chunks in storage", exc3);
                }).then(map -> {
                    return this.stateManager.push(Utils.materializeProtoDiff((List<LogDiff<ProtoCubeDiff>>) list2, (Map<String, Long>) map)).mapException(exc4 -> {
                        return new CubeException("Failed to synchronize state after log processing, resetting", exc4);
                    });
                }).map(r2 -> {
                    return true;
                });
            });
        }).whenComplete(LogUtils.toLogger(logger, LogUtils.thisMethod(), new Object[]{this.stateManager}));
    }

    private void cubeDiffJmx(List<LogDiff<ProtoCubeDiff>> list) {
        long j = 0;
        long j2 = 0;
        Iterator<LogDiff<ProtoCubeDiff>> it = list.iterator();
        while (it.hasNext()) {
            Iterator it2 = it.next().getDiffs().iterator();
            while (it2.hasNext()) {
                Map<String, ProtoAggregationDiff> diffs = ((ProtoCubeDiff) it2.next()).diffs();
                Iterator<String> it3 = diffs.keySet().iterator();
                while (it3.hasNext()) {
                    j += r0.addedChunks().size();
                    while (diffs.get(it3.next()).addedChunks().iterator().hasNext()) {
                        j2 += r0.next().count();
                    }
                }
            }
        }
        this.addedChunks.recordValue(j);
        this.addedChunksRecords.recordValue(j2);
    }

    private Set<String> addedProtoChunks(List<LogDiff<ProtoCubeDiff>> list) {
        return (Set) list.stream().flatMap((v0) -> {
            return v0.diffs();
        }).flatMap((v0) -> {
            return v0.addedProtoChunks();
        }).collect(Collectors.toSet());
    }

    @JmxAttribute
    public ValueStats getLastAddedChunks() {
        return this.addedChunks;
    }

    @JmxAttribute
    public ValueStats getLastAddedChunksRecords() {
        return this.addedChunksRecords;
    }

    @JmxAttribute
    public PromiseStats getPromiseProcessLogs() {
        return this.promiseProcessLogs;
    }

    @JmxAttribute
    public PromiseStats getPromiseProcessLogsImpl() {
        return this.promiseProcessLogsImpl;
    }

    @JmxAttribute
    public boolean isParallelRunner() {
        return this.parallelRunner;
    }

    @JmxAttribute
    public void setParallelRunner(boolean z) {
        this.parallelRunner = z;
    }

    @JmxAttribute
    public int getMaxOverlappingChunksToProcessLogs() {
        return this.maxOverlappingChunksToProcessLogs;
    }

    @JmxAttribute
    public void setMaxOverlappingChunksToProcessLogs(int i) {
        this.maxOverlappingChunksToProcessLogs = i;
    }

    @JmxOperation
    public void processLogsNow() {
        processLogs();
    }
}
