package org.opensearch.migrations.replay.traffic.source;

import io.netty.util.concurrent.DefaultThreadFactory;
import java.io.IOException;
import java.time.Duration;
import java.time.Instant;
import java.time.temporal.TemporalAmount;
import java.util.Comparator;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.StringJoiner;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import org.opensearch.migrations.replay.Utils;
import org.opensearch.migrations.replay.datatypes.ITrafficStreamKey;
import org.opensearch.migrations.replay.tracing.ITrafficSourceContexts;
import org.opensearch.migrations.replay.traffic.source.ITrafficCaptureSource;
import org.opensearch.migrations.trafficcapture.protos.TrafficStreamUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.event.Level;
import org.slf4j.spi.LoggingEventBuilder;

/* loaded from: input_file:org/opensearch/migrations/replay/traffic/source/BlockingTrafficSource.class */
public class BlockingTrafficSource implements ITrafficCaptureSource, BufferedFlowController {
    private static final Logger log = LoggerFactory.getLogger(BlockingTrafficSource.class);
    private final ISimpleTrafficCaptureSource underlyingSource;
    private final Duration bufferTimeWindow;
    private final AtomicReference<Instant> stopReadingAtRef = new AtomicReference<>(Instant.EPOCH);
    private final AtomicReference<Instant> lastTimestampSecondsRef = new AtomicReference<>(Instant.EPOCH);
    private final Semaphore readGate = new Semaphore(0);
    private final ExecutorService executorForBlockingActivity = Executors.newSingleThreadExecutor(new DefaultThreadFactory("BlockingTrafficSource-executorForBlockingActivity-" + System.identityHashCode(this)));

    public BlockingTrafficSource(ISimpleTrafficCaptureSource iSimpleTrafficCaptureSource, Duration duration) {
        this.underlyingSource = iSimpleTrafficCaptureSource;
        this.bufferTimeWindow = duration;
    }

    @Override // org.opensearch.migrations.replay.traffic.source.BufferedFlowController
    public void stopReadsPast(Instant instant) {
        Instant plus = instant.plus((TemporalAmount) this.bufferTimeWindow);
        Instant ifLater = Utils.setIfLater(this.stopReadingAtRef, plus);
        if (!ifLater.equals(plus)) {
            log.atTrace().setMessage("stopReadsPast: {} [buffer={}] didn't move the cursor because the value was already at {}").addArgument(instant).addArgument(plus).addArgument(ifLater).log();
            return;
        }
        log.atLevel(Level.TRACE).setMessage("Releasing the block on readNextTrafficStreamChunk and set the new stopReadingAtRef={}").addArgument(ifLater).log();
        this.readGate.drainPermits();
        this.readGate.release();
    }

    @Override // org.opensearch.migrations.replay.traffic.source.ITrafficCaptureSource
    public CompletableFuture<List<ITrafficStreamWithKey>> readNextTrafficStreamChunk(Supplier<ITrafficSourceContexts.IReadChunkContext> supplier) {
        ITrafficSourceContexts.IReadChunkContext iReadChunkContext = supplier.get();
        log.debug("BlockingTrafficSource::readNext");
        return CompletableFuture.supplyAsync(() -> {
            return blockIfNeeded(iReadChunkContext);
        }, this.executorForBlockingActivity).thenCompose(r5 -> {
            log.trace("BlockingTrafficSource::composing");
            return this.underlyingSource.readNextTrafficStreamChunk(() -> {
                return iReadChunkContext;
            });
        }).whenComplete((list, th) -> {
            iReadChunkContext.close();
        }).whenComplete((list2, th2) -> {
            if (th2 != null) {
                return;
            }
            Utils.setIfLater(this.lastTimestampSecondsRef, (Instant) list2.stream().flatMap(iTrafficStreamWithKey -> {
                return iTrafficStreamWithKey.getStream().getSubStreamList().stream();
            }).map((v0) -> {
                return v0.getTs();
            }).max(Comparator.comparingLong((v0) -> {
                return v0.getSeconds();
            }).thenComparingInt((v0) -> {
                return v0.getNanos();
            })).map(TrafficStreamUtils::instantFromProtoTimestamp).orElse(Instant.EPOCH));
            LoggingEventBuilder message = log.atTrace().setMessage("end of readNextTrafficStreamChunk trigger...lastTimestampSecondsRef={}");
            AtomicReference<Instant> atomicReference = this.lastTimestampSecondsRef;
            Objects.requireNonNull(atomicReference);
            message.addArgument(atomicReference::get).log();
        });
    }

    private Void blockIfNeeded(ITrafficSourceContexts.IReadChunkContext iReadChunkContext) {
        ITrafficSourceContexts.IWaitForNextSignal createWaitForSignalContext;
        if (this.stopReadingAtRef.get().equals(Instant.EPOCH)) {
            return null;
        }
        log.atTrace().setMessage("stopReadingAtRef={} lastTimestampSecondsRef={}").addArgument(this.stopReadingAtRef).addArgument(this.lastTimestampSecondsRef).log();
        ITrafficSourceContexts.IBackPressureBlockContext iBackPressureBlockContext = null;
        while (this.stopReadingAtRef.get().isBefore(this.lastTimestampSecondsRef.get())) {
            if (iBackPressureBlockContext == null) {
                iBackPressureBlockContext = iReadChunkContext.createBackPressureContext();
            }
            try {
                LoggingEventBuilder message = log.atTrace().setMessage("blocking until signaled to read the next chunk last={} stop={}");
                AtomicReference<Instant> atomicReference = this.lastTimestampSecondsRef;
                Objects.requireNonNull(atomicReference);
                LoggingEventBuilder addArgument = message.addArgument(atomicReference::get);
                AtomicReference<Instant> atomicReference2 = this.stopReadingAtRef;
                Objects.requireNonNull(atomicReference2);
                addArgument.addArgument(atomicReference2::get).log();
                Optional<Instant> nextRequiredTouch = this.underlyingSource.getNextRequiredTouch();
                if (nextRequiredTouch.isEmpty()) {
                    log.trace("acquiring readGate semaphore (w/out timeout)");
                    createWaitForSignalContext = iBackPressureBlockContext.createWaitForSignalContext();
                    try {
                        this.readGate.acquire();
                        if (createWaitForSignalContext != null) {
                            createWaitForSignalContext.close();
                        }
                    } finally {
                    }
                } else {
                    Instant instant = nextRequiredTouch.get();
                    Instant now = Instant.now();
                    long millis = Duration.between(now, instant).toMillis();
                    log.atDebug().setMessage("Next touch at {} ... in {}ms (now={})").addArgument(instant).addArgument(Long.valueOf(millis)).addArgument(now).log();
                    if (millis <= 0) {
                        this.underlyingSource.touch(iBackPressureBlockContext);
                    } else {
                        log.atTrace().setMessage("acquiring readGate semaphore with timeout={}").addArgument(Long.valueOf(millis)).log();
                        createWaitForSignalContext = iBackPressureBlockContext.createWaitForSignalContext();
                        try {
                            boolean tryAcquire = this.readGate.tryAcquire(millis, TimeUnit.MILLISECONDS);
                            log.atTrace().setMessage("semaphore {}").addArgument(() -> {
                                return (tryAcquire ? "" : "not ") + "acquired";
                            }).log();
                            if (createWaitForSignalContext != null) {
                                createWaitForSignalContext.close();
                            }
                        } finally {
                        }
                    }
                }
            } catch (InterruptedException e) {
                log.atWarn().setCause(e).setMessage("Interrupted while waiting to read more data").log();
                Thread.currentThread().interrupt();
            }
        }
        if (iBackPressureBlockContext == null) {
            return null;
        }
        iBackPressureBlockContext.close();
        return null;
    }

    @Override // org.opensearch.migrations.replay.traffic.source.ITrafficCaptureSource
    public ITrafficCaptureSource.CommitResult commitTrafficStream(ITrafficStreamKey iTrafficStreamKey) throws IOException {
        ITrafficCaptureSource.CommitResult commitTrafficStream = this.underlyingSource.commitTrafficStream(iTrafficStreamKey);
        if (commitTrafficStream == ITrafficCaptureSource.CommitResult.AFTER_NEXT_READ) {
            this.readGate.drainPermits();
            this.readGate.release();
        }
        return commitTrafficStream;
    }

    @Override // org.opensearch.migrations.replay.traffic.source.ITrafficCaptureSource, java.lang.AutoCloseable
    public void close() throws Exception {
        this.underlyingSource.close();
        this.executorForBlockingActivity.shutdown();
    }

    public String toString() {
        return new StringJoiner(", ", BlockingTrafficSource.class.getSimpleName() + "[", "]").add("bufferTimeWindow=" + String.valueOf(this.bufferTimeWindow)).add("lastTimestampSecondsRef=" + String.valueOf(this.lastTimestampSecondsRef)).add("stopReadingAtRef=" + String.valueOf(this.stopReadingAtRef)).add("readGate=" + String.valueOf(this.readGate)).toString();
    }

    @Override // org.opensearch.migrations.replay.traffic.source.BufferedFlowController
    public Duration getBufferTimeWindow() {
        return this.bufferTimeWindow;
    }
}
