package io.datakernel.logfs.ot;

import io.datakernel.async.AsyncCallable;
import io.datakernel.async.Stage;
import io.datakernel.async.StagesAccumulator;
import io.datakernel.cube.http.AggregationPredicateGsonAdapter;
import io.datakernel.eventloop.Eventloop;
import io.datakernel.eventloop.EventloopService;
import io.datakernel.jmx.EventloopJmxMBeanEx;
import io.datakernel.jmx.JmxAttribute;
import io.datakernel.jmx.JmxOperation;
import io.datakernel.jmx.StageStats;
import io.datakernel.logfs.LogFile;
import io.datakernel.logfs.LogManager;
import io.datakernel.logfs.LogPosition;
import io.datakernel.logfs.ot.LogDiff;
import io.datakernel.stream.StreamConsumerWithResult;
import io.datakernel.stream.StreamProducerWithResult;
import io.datakernel.stream.processor.StreamUnion;
import io.datakernel.stream.stats.StreamStats;
import io.datakernel.stream.stats.StreamStatsBasic;
import io.datakernel.stream.stats.StreamStatsDetailed;
import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/datakernel/logfs/ot/LogOTProcessor.class */
public final class LogOTProcessor<T, D> implements EventloopService, EventloopJmxMBeanEx {
    private final Eventloop eventloop;
    private final LogManager<T> logManager;
    private final LogDataConsumer<T, D> logStreamConsumer;
    private final String log;
    private final List<String> partitions;
    private final LogOTState<D> state;
    private boolean detailed;
    private final Logger logger = LoggerFactory.getLogger(getClass());
    private boolean enabled = true;
    private final StreamStatsBasic<T> streamStatsBasic = StreamStats.basic();
    private final StreamStatsDetailed<T> streamStatsDetailed = StreamStats.detailed();
    private final StageStats stageProcessLog = StageStats.create(Duration.ofMinutes(5));
    private final StageStats stageProducer = StageStats.create(Duration.ofMinutes(5));
    private final StageStats stageConsumer = StageStats.create(Duration.ofMinutes(5));
    private final AsyncCallable<LogDiff<D>> processLog = AsyncCallable.sharedCall(this::doProcessLog);

    private LogOTProcessor(Eventloop eventloop, LogManager<T> logManager, LogDataConsumer<T, D> logDataConsumer, String str, List<String> list, LogOTState<D> logOTState) {
        this.eventloop = eventloop;
        this.logManager = logManager;
        this.logStreamConsumer = logDataConsumer;
        this.log = str;
        this.partitions = list;
        this.state = logOTState;
    }

    public static <T, D> LogOTProcessor<T, D> create(Eventloop eventloop, LogManager<T> logManager, LogDataConsumer<T, D> logDataConsumer, String str, List<String> list, LogOTState<D> logOTState) {
        return new LogOTProcessor<>(eventloop, logManager, logDataConsumer, str, list, logOTState);
    }

    public Eventloop getEventloop() {
        return this.eventloop;
    }

    public Stage<Void> start() {
        return Stage.of((Object) null);
    }

    public Stage<Void> stop() {
        return Stage.of((Object) null);
    }

    public Stage<LogDiff<D>> processLog() {
        return this.processLog.call();
    }

    private Stage<LogDiff<D>> doProcessLog() {
        if (!this.enabled) {
            return Stage.of(LogDiff.of((Map<String, LogDiff.LogPositionDiff>) Collections.emptyMap(), Collections.emptyList()));
        }
        this.logger.trace("processLog_gotPositions called. Positions: {}", this.state.getPositions());
        StreamProducerWithResult<T, Map<String, LogDiff.LogPositionDiff>> producer = getProducer();
        StreamConsumerWithResult<T, List<D>> consume = this.logStreamConsumer.consume();
        producer.getResult().whenComplete(this.stageProducer.recordStats());
        consume.getResult().whenComplete(this.stageConsumer.recordStats());
        return producer.streamTo(consume).getResult().whenComplete(this.stageProcessLog.recordStats()).thenApply(pair -> {
            return LogDiff.of((Map<String, LogDiff.LogPositionDiff>) pair.getProducerResult(), (List) pair.getConsumerResult());
        }).whenResult(logDiff -> {
            this.logger.info("Log '{}' processing complete. Positions: {}", this.log, logDiff.getPositions());
        });
    }

    private StreamProducerWithResult<T, Map<String, LogDiff.LogPositionDiff>> getProducer() {
        StagesAccumulator create = StagesAccumulator.create(new HashMap());
        StreamUnion create2 = StreamUnion.create();
        for (String str : this.partitions) {
            String logName = logName(str);
            LogPosition logPosition = this.state.getPositions().get(logName);
            if (logPosition == null) {
                logPosition = LogPosition.create(new LogFile(AggregationPredicateGsonAdapter.EMPTY_STRING, 0), 0L);
            }
            this.logger.info("Starting reading '{}' from position {}", logName, logPosition);
            LogPosition logPosition2 = logPosition;
            StreamProducerWithResult<T, LogPosition> producerStream = this.logManager.producerStream(str, logPosition.getLogFile(), logPosition.getPosition(), null);
            producerStream.streamTo(create2.newInput());
            create.addStage(producerStream.getResult(), (map, logPosition3) -> {
                if (logPosition3.equals(logPosition2)) {
                    return;
                }
                map.put(logName, new LogDiff.LogPositionDiff(logPosition2, logPosition3));
            });
        }
        return create2.getOutput().with(this.detailed ? this.streamStatsDetailed : this.streamStatsBasic).withResult(create.get());
    }

    private String logName(String str) {
        return (this.log == null || this.log.isEmpty()) ? str : this.log + "." + str;
    }

    @JmxAttribute
    public boolean isEnabled() {
        return this.enabled;
    }

    @JmxAttribute
    public void setEnabled(boolean z) {
        this.enabled = z;
    }

    @JmxAttribute
    public StageStats getStageProcessLog() {
        return this.stageProcessLog;
    }

    @JmxAttribute
    public StageStats getStageProducer() {
        return this.stageProducer;
    }

    @JmxAttribute
    public StageStats getStageConsumer() {
        return this.stageConsumer;
    }

    @JmxAttribute
    public StreamStatsBasic getStreamStatsBasic() {
        return this.streamStatsBasic;
    }

    @JmxAttribute
    public StreamStatsDetailed getStreamStatsDetailed() {
        return this.streamStatsDetailed;
    }

    @JmxOperation
    public void startDetailedMonitoring() {
        this.detailed = true;
    }

    @JmxOperation
    public void stopDetailedMonitoring() {
        this.detailed = false;
    }
}
