package org.opensearch.migrations.replay;

import io.netty.util.concurrent.ScheduledFuture;
import java.time.Duration;
import java.time.Instant;
import java.time.temporal.TemporalAmount;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;
import org.opensearch.migrations.replay.RequestSenderOrchestrator;
import org.opensearch.migrations.replay.datatypes.ByteBufList;
import org.opensearch.migrations.replay.datatypes.ISourceTrafficChannelKey;
import org.opensearch.migrations.replay.datatypes.IndexedChannelInteraction;
import org.opensearch.migrations.replay.datatypes.UniqueReplayerRequestKey;
import org.opensearch.migrations.replay.tracing.IReplayContexts;
import org.opensearch.migrations.replay.traffic.source.BufferedFlowController;
import org.opensearch.migrations.utils.TrackedFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.spi.LoggingEventBuilder;

/* loaded from: input_file:org/opensearch/migrations/replay/ReplayEngine.class */
public class ReplayEngine {
    public static final int BACKPRESSURE_UPDATE_FREQUENCY = 8;
    private final RequestSenderOrchestrator networkSendOrchestrator;
    private final BufferedFlowController contentTimeController;
    private final TimeShifter timeShifter;
    ScheduledFuture<?> updateContentTimeControllerScheduledFuture;
    private static final Logger log = LoggerFactory.getLogger(ReplayEngine.class);
    public static final TimeUnit TIME_UNIT_MILLIS = TimeUnit.MILLISECONDS;
    public static final Duration EXPECTED_TRANSFORMATION_DURATION = Duration.ofSeconds(1);
    private final AtomicLong totalCountOfScheduledTasksOutstanding = new AtomicLong();
    private final AtomicLong lastCompletedSourceTimeEpochMs = new AtomicLong(0);
    private final AtomicLong lastIdleUpdatedTimestampEpochMs = new AtomicLong(0);

    public ReplayEngine(RequestSenderOrchestrator requestSenderOrchestrator, BufferedFlowController bufferedFlowController, TimeShifter timeShifter) {
        this.networkSendOrchestrator = requestSenderOrchestrator;
        this.contentTimeController = bufferedFlowController;
        this.timeShifter = timeShifter;
        long updatePeriodMs = getUpdatePeriodMs();
        this.updateContentTimeControllerScheduledFuture = requestSenderOrchestrator.scheduleAtFixedRate(this::updateContentTimeControllerWhenIdling, updatePeriodMs, updatePeriodMs, TIME_UNIT_MILLIS);
    }

    private long getUpdatePeriodMs() {
        long millis = this.contentTimeController.getBufferTimeWindow().dividedBy(8L).toMillis();
        if (millis == 0) {
            throw new IllegalStateException("Buffer window time is too small, make it at least 8 " + TIME_UNIT_MILLIS.name());
        }
        return millis;
    }

    private void updateContentTimeControllerWhenIdling() {
        if (isWorkOutstanding()) {
            return;
        }
        Optional<Instant> transformRealTimeToSourceTime = this.timeShifter.transformRealTimeToSourceTime(Instant.now());
        if (transformRealTimeToSourceTime.isEmpty()) {
            return;
        }
        this.lastIdleUpdatedTimestampEpochMs.set(Math.min(transformRealTimeToSourceTime.get().toEpochMilli(), Math.max(this.lastCompletedSourceTimeEpochMs.get(), this.lastIdleUpdatedTimestampEpochMs.get()) + ((long) (getUpdatePeriodMs() * this.timeShifter.maxRateMultiplier()))));
        this.contentTimeController.stopReadsPast(Instant.ofEpochMilli(this.lastIdleUpdatedTimestampEpochMs.get()));
    }

    public boolean isWorkOutstanding() {
        return this.totalCountOfScheduledTasksOutstanding.get() > 0;
    }

    private <T> TrackedFuture<String, T> hookWorkFinishingUpdates(TrackedFuture<String, T> trackedFuture, Instant instant, Object obj, String str) {
        return trackedFuture.map(completableFuture -> {
            return completableFuture.whenComplete((obj2, th) -> {
                Utils.setIfLater(this.lastCompletedSourceTimeEpochMs, instant.toEpochMilli());
            }).whenComplete((obj3, th2) -> {
                log.atInfo().setMessage("Scheduled task '{}' finished ({}) decremented tasksOutstanding to {}").addArgument(str).addArgument(obj).addArgument(Long.valueOf(this.totalCountOfScheduledTasksOutstanding.decrementAndGet())).log();
            }).whenComplete((obj4, th3) -> {
                this.contentTimeController.stopReadsPast(instant);
            }).whenComplete((obj5, th4) -> {
                LoggingEventBuilder addArgument = log.atDebug().setMessage("work finished and used timestamp={} to update contentTimeController (tasksOutstanding={})").addArgument(instant);
                AtomicLong atomicLong = this.totalCountOfScheduledTasksOutstanding;
                Objects.requireNonNull(atomicLong);
                addArgument.addArgument(atomicLong::get).log();
            });
        }, () -> {
            return "Updating fields for callers to poll progress and updating backpressure";
        });
    }

    private static void logStartOfWork(Object obj, long j, Instant instant, String str) {
        log.atInfo().setMessage("Scheduling '{}' ({}) to run at {} incremented tasksOutstanding to {}").addArgument(str).addArgument(obj).addArgument(instant).addArgument(Long.valueOf(j)).log();
    }

    public <T> TrackedFuture<String, T> scheduleTransformationWork(IReplayContexts.IReplayerHttpTransactionContext iReplayerHttpTransactionContext, Instant instant, Supplier<TrackedFuture<String, T>> supplier) {
        long incrementAndGet = this.totalCountOfScheduledTasksOutstanding.incrementAndGet();
        Instant transformSourceTimeToRealTime = this.timeShifter.transformSourceTimeToRealTime(instant);
        logStartOfWork(iReplayerHttpTransactionContext, incrementAndGet, transformSourceTimeToRealTime, "processing");
        return hookWorkFinishingUpdates(this.networkSendOrchestrator.scheduleWork(iReplayerHttpTransactionContext, transformSourceTimeToRealTime.minus((TemporalAmount) EXPECTED_TRANSFORMATION_DURATION), supplier), instant, iReplayerHttpTransactionContext, "processing");
    }

    public <T> TrackedFuture<String, T> scheduleRequest(IReplayContexts.IReplayerHttpTransactionContext iReplayerHttpTransactionContext, Instant instant, Instant instant2, int i, ByteBufList byteBufList, RequestSenderOrchestrator.RetryVisitor<T> retryVisitor) {
        long incrementAndGet = this.totalCountOfScheduledTasksOutstanding.incrementAndGet();
        Instant transformSourceTimeToRealTime = this.timeShifter.transformSourceTimeToRealTime(instant);
        Instant transformSourceTimeToRealTime2 = this.timeShifter.transformSourceTimeToRealTime(instant2);
        Duration dividedBy = i > 1 ? Duration.between(transformSourceTimeToRealTime, transformSourceTimeToRealTime2).dividedBy(i - 1) : Duration.ZERO;
        UniqueReplayerRequestKey replayerRequestKey = iReplayerHttpTransactionContext.getReplayerRequestKey();
        logStartOfWork(replayerRequestKey, incrementAndGet, transformSourceTimeToRealTime, "request");
        log.atDebug().setMessage("Scheduling request for {} to run from [{}, {}] with an interval of {} for {} packets").addArgument(iReplayerHttpTransactionContext).addArgument(transformSourceTimeToRealTime).addArgument(transformSourceTimeToRealTime2).addArgument(dividedBy).addArgument(Integer.valueOf(i)).log();
        return hookWorkFinishingUpdates(this.networkSendOrchestrator.scheduleRequest(replayerRequestKey, iReplayerHttpTransactionContext, transformSourceTimeToRealTime, dividedBy, byteBufList, retryVisitor), instant, replayerRequestKey, "request");
    }

    public TrackedFuture<String, Void> closeConnection(int i, IReplayContexts.IChannelKeyContext iChannelKeyContext, int i2, Instant instant) {
        long incrementAndGet = this.totalCountOfScheduledTasksOutstanding.incrementAndGet();
        Instant transformSourceTimeToRealTime = this.timeShifter.transformSourceTimeToRealTime(instant);
        ISourceTrafficChannelKey channelKey = iChannelKeyContext.getChannelKey();
        logStartOfWork(new IndexedChannelInteraction(channelKey, i), incrementAndGet, transformSourceTimeToRealTime, "close");
        return hookWorkFinishingUpdates(this.networkSendOrchestrator.scheduleClose(iChannelKeyContext, i2, i, transformSourceTimeToRealTime), instant, channelKey, "close");
    }

    public void setFirstTimestamp(Instant instant) {
        this.timeShifter.setFirstTimestamp(instant);
    }
}
