package io.activej.etl;

import io.activej.async.AsyncAccumulator;
import io.activej.async.function.AsyncSupplier;
import io.activej.async.function.AsyncSuppliers;
import io.activej.async.service.ReactiveService;
import io.activej.datastream.processor.StreamUnion;
import io.activej.datastream.stats.BasicStreamStats;
import io.activej.datastream.stats.DetailedStreamStats;
import io.activej.datastream.stats.StreamStats;
import io.activej.datastream.supplier.StreamSupplier;
import io.activej.datastream.supplier.StreamSupplierWithResult;
import io.activej.jmx.api.attribute.JmxAttribute;
import io.activej.jmx.api.attribute.JmxOperation;
import io.activej.multilog.IMultilog;
import io.activej.multilog.LogFile;
import io.activej.multilog.LogPosition;
import io.activej.ot.OTState;
import io.activej.ot.StateManager;
import io.activej.promise.Promise;
import io.activej.promise.jmx.PromiseStats;
import io.activej.reactor.AbstractReactive;
import io.activej.reactor.Reactive;
import io.activej.reactor.Reactor;
import io.activej.reactor.jmx.ReactiveJmxBeanWithStats;
import java.time.Duration;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/activej/etl/LogProcessor.class */
public final class LogProcessor<T, P, D> extends AbstractReactive implements ReactiveService, ReactiveJmxBeanWithStats {
    private static final Logger logger = LoggerFactory.getLogger(LogProcessor.class);
    private final IMultilog<T> multilog;
    private final ILogDataConsumer<T, P> logStreamConsumer;
    private final String log;
    private final List<String> partitions;
    private final StateManager<?, LogState<D, ?>> stateManager;
    private boolean enabled;
    private boolean detailed;
    private final BasicStreamStats<T> streamStatsBasic;
    private final DetailedStreamStats<T> streamStatsDetailed;
    private final PromiseStats promiseProcessLog;
    private final AsyncSupplier<LogDiff<P>> processLog;

    private LogProcessor(Reactor reactor, IMultilog<T> iMultilog, ILogDataConsumer<T, P> iLogDataConsumer, String str, List<String> list, StateManager<?, LogState<D, ?>> stateManager) {
        super(reactor);
        this.enabled = true;
        this.streamStatsBasic = StreamStats.basic();
        this.streamStatsDetailed = StreamStats.detailed();
        this.promiseProcessLog = PromiseStats.create(Duration.ofMinutes(5L));
        this.processLog = AsyncSuppliers.reuse(this::doProcessLog);
        this.multilog = iMultilog;
        this.logStreamConsumer = iLogDataConsumer;
        this.log = str;
        this.partitions = list;
        this.stateManager = stateManager;
    }

    public static <T, P, D, S extends OTState<D>> LogProcessor<T, P, D> create(Reactor reactor, IMultilog<T> iMultilog, ILogDataConsumer<T, P> iLogDataConsumer, String str, List<String> list, StateManager<LogDiff<D>, LogState<D, S>> stateManager) {
        return new LogProcessor<>(reactor, iMultilog, iLogDataConsumer, str, list, stateManager);
    }

    public Promise<?> start() {
        Reactive.checkInReactorThread(this);
        return Promise.complete();
    }

    public Promise<?> stop() {
        Reactive.checkInReactorThread(this);
        return Promise.complete();
    }

    public Promise<LogDiff<P>> processLog() {
        Reactive.checkInReactorThread(this);
        return this.processLog.get();
    }

    private Promise<LogDiff<P>> doProcessLog() {
        if (!this.enabled) {
            return Promise.of(LogDiff.of((Map<String, LogPositionDiff>) Map.of(), List.of()));
        }
        Map<String, LogPosition> map = (Map) this.stateManager.query(logState -> {
            return Map.copyOf(logState.getPositions());
        });
        logger.trace("processLog_gotPositions called. Positions: {}", map);
        return getSupplier(map).streamTo(this.logStreamConsumer.consume()).whenComplete(this.promiseProcessLog.recordStats()).map(tuple2 -> {
            return LogDiff.of((Map<String, LogPositionDiff>) tuple2.value1(), (List) tuple2.value2());
        }).whenResult(logDiff -> {
            logger.info("Log '{}' processing complete. Positions: {}", this.log, logDiff.getPositions());
        });
    }

    private StreamSupplierWithResult<T, Map<String, LogPositionDiff>> getSupplier(Map<String, LogPosition> map) {
        AsyncAccumulator create = AsyncAccumulator.create(new HashMap());
        StreamUnion create2 = StreamUnion.create();
        for (String str : this.partitions) {
            String logName = logName(str);
            LogPosition logPosition = map.get(logName);
            if (logPosition == null) {
                logPosition = LogPosition.initial();
            }
            logger.info("Starting reading '{}' from position {}", logName, logPosition);
            LogPosition logPosition2 = logPosition;
            create.addPromise(StreamSupplierWithResult.ofPromise(this.multilog.read(str, logPosition.getLogFile(), logPosition.getPosition(), (LogFile) null)).streamTo(create2.newInput()), (map2, logPosition3) -> {
                if (logPosition3.equals(logPosition2)) {
                    return;
                }
                map2.put(logName, new LogPositionDiff(logPosition2, logPosition3));
            });
        }
        return StreamSupplierWithResult.of((StreamSupplier) create2.getOutput().transformWith(this.detailed ? this.streamStatsDetailed : this.streamStatsBasic), create.run());
    }

    private String logName(String str) {
        return (this.log == null || this.log.isEmpty()) ? str : this.log + "." + str;
    }

    @JmxAttribute
    public boolean isEnabled() {
        return this.enabled;
    }

    @JmxAttribute
    public void setEnabled(boolean z) {
        this.enabled = z;
    }

    @JmxAttribute
    public PromiseStats getPromiseProcessLog() {
        return this.promiseProcessLog;
    }

    @JmxAttribute
    public BasicStreamStats getStreamStatsBasic() {
        return this.streamStatsBasic;
    }

    @JmxAttribute
    public DetailedStreamStats getStreamStatsDetailed() {
        return this.streamStatsDetailed;
    }

    @JmxOperation
    public void startDetailedMonitoring() {
        this.detailed = true;
    }

    @JmxOperation
    public void stopDetailedMonitoring() {
        this.detailed = false;
    }
}
