package io.activej.etl;

import io.activej.async.AsyncAccumulator;
import io.activej.async.function.AsyncSupplier;
import io.activej.async.function.AsyncSuppliers;
import io.activej.async.service.EventloopService;
import io.activej.common.initializer.WithInitializer;
import io.activej.datastream.StreamSupplier;
import io.activej.datastream.StreamSupplierWithResult;
import io.activej.datastream.processor.StreamUnion;
import io.activej.datastream.stats.StreamStats;
import io.activej.datastream.stats.StreamStatsBasic;
import io.activej.datastream.stats.StreamStatsDetailed;
import io.activej.eventloop.Eventloop;
import io.activej.eventloop.jmx.EventloopJmxBeanWithStats;
import io.activej.jmx.api.attribute.JmxAttribute;
import io.activej.jmx.api.attribute.JmxOperation;
import io.activej.multilog.LogFile;
import io.activej.multilog.LogPosition;
import io.activej.multilog.Multilog;
import io.activej.promise.Promise;
import io.activej.promise.jmx.PromiseStats;
import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/activej/etl/LogOTProcessor.class */
public final class LogOTProcessor<T, D> implements EventloopService, EventloopJmxBeanWithStats, WithInitializer<LogOTProcessor<T, D>> {
    private static final Logger logger = LoggerFactory.getLogger(LogOTProcessor.class);
    private final Eventloop eventloop;
    private final Multilog<T> multilog;
    private final LogDataConsumer<T, D> logStreamConsumer;
    private final String log;
    private final List<String> partitions;
    private final LogOTState<D> state;
    private boolean detailed;
    private boolean enabled = true;
    private final StreamStatsBasic<T> streamStatsBasic = StreamStats.basic();
    private final StreamStatsDetailed<T> streamStatsDetailed = StreamStats.detailed();
    private final PromiseStats promiseProcessLog = PromiseStats.create(Duration.ofMinutes(5));
    private final AsyncSupplier<LogDiff<D>> processLog = AsyncSuppliers.reuse(this::doProcessLog);

    private LogOTProcessor(Eventloop eventloop, Multilog<T> multilog, LogDataConsumer<T, D> logDataConsumer, String str, List<String> list, LogOTState<D> logOTState) {
        this.eventloop = eventloop;
        this.multilog = multilog;
        this.logStreamConsumer = logDataConsumer;
        this.log = str;
        this.partitions = list;
        this.state = logOTState;
    }

    public static <T, D> LogOTProcessor<T, D> create(Eventloop eventloop, Multilog<T> multilog, LogDataConsumer<T, D> logDataConsumer, String str, List<String> list, LogOTState<D> logOTState) {
        return new LogOTProcessor<>(eventloop, multilog, logDataConsumer, str, list, logOTState);
    }

    @NotNull
    public Eventloop getEventloop() {
        return this.eventloop;
    }

    @NotNull
    public Promise<Void> start() {
        return Promise.complete();
    }

    @NotNull
    public Promise<Void> stop() {
        return Promise.complete();
    }

    public Promise<LogDiff<D>> processLog() {
        return this.processLog.get();
    }

    @NotNull
    private Promise<LogDiff<D>> doProcessLog() {
        if (!this.enabled) {
            return Promise.of(LogDiff.of((Map<String, LogPositionDiff>) Collections.emptyMap(), Collections.emptyList()));
        }
        logger.trace("processLog_gotPositions called. Positions: {}", this.state.getPositions());
        return getSupplier().streamTo(this.logStreamConsumer.consume()).whenComplete(this.promiseProcessLog.recordStats()).map(tuple2 -> {
            return LogDiff.of((Map<String, LogPositionDiff>) tuple2.getValue1(), (List) tuple2.getValue2());
        }).whenResult(logDiff -> {
            logger.info("Log '{}' processing complete. Positions: {}", this.log, logDiff.getPositions());
        });
    }

    private StreamSupplierWithResult<T, Map<String, LogPositionDiff>> getSupplier() {
        AsyncAccumulator create = AsyncAccumulator.create(new HashMap());
        StreamUnion create2 = StreamUnion.create();
        for (String str : this.partitions) {
            String logName = logName(str);
            LogPosition logPosition = this.state.getPositions().get(logName);
            if (logPosition == null) {
                logPosition = LogPosition.initial();
            }
            logger.info("Starting reading '{}' from position {}", logName, logPosition);
            LogPosition logPosition2 = logPosition;
            create.addPromise(StreamSupplierWithResult.ofPromise(this.multilog.read(str, logPosition.getLogFile(), logPosition.getPosition(), (LogFile) null)).streamTo(create2.newInput()), (map, logPosition3) -> {
                if (logPosition3.equals(logPosition2)) {
                    return;
                }
                map.put(logName, new LogPositionDiff(logPosition2, logPosition3));
            });
        }
        return StreamSupplierWithResult.of((StreamSupplier) create2.getOutput().transformWith(this.detailed ? this.streamStatsDetailed : this.streamStatsBasic), create.run().promise());
    }

    private String logName(String str) {
        return (this.log == null || this.log.isEmpty()) ? str : this.log + "." + str;
    }

    @JmxAttribute
    public boolean isEnabled() {
        return this.enabled;
    }

    @JmxAttribute
    public void setEnabled(boolean z) {
        this.enabled = z;
    }

    @JmxAttribute
    public PromiseStats getPromiseProcessLog() {
        return this.promiseProcessLog;
    }

    @JmxAttribute
    public StreamStatsBasic getStreamStatsBasic() {
        return this.streamStatsBasic;
    }

    @JmxAttribute
    public StreamStatsDetailed getStreamStatsDetailed() {
        return this.streamStatsDetailed;
    }

    @JmxOperation
    public void startDetailedMonitoring() {
        this.detailed = true;
    }

    @JmxOperation
    public void stopDetailedMonitoring() {
        this.detailed = false;
    }
}
