package org.opensearch.migrations.replay.traffic.source;

import java.io.EOFException;
import java.util.List;
import java.util.Optional;
import java.util.PriorityQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
import org.opensearch.migrations.replay.datatypes.ITrafficStreamKey;
import org.opensearch.migrations.replay.datatypes.PojoTrafficStreamAndKey;
import org.opensearch.migrations.replay.tracing.ITrafficSourceContexts;
import org.opensearch.migrations.replay.traffic.source.ITrafficCaptureSource;
import org.opensearch.migrations.tracing.TestContext;
import org.opensearch.migrations.trafficcapture.protos.TrafficStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/opensearch/migrations/replay/traffic/source/ArrayCursorTrafficCaptureSource.class */
public class ArrayCursorTrafficCaptureSource implements ISimpleTrafficCaptureSource {
    private static final Logger log;
    final AtomicInteger readCursor;
    final PriorityQueue<TrafficStreamCursorKey> pQueue = new PriorityQueue<>();
    Integer cursorHighWatermark;
    ArrayCursorTrafficSourceContext arrayCursorTrafficSourceContext;
    TestContext rootContext;
    static final /* synthetic */ boolean $assertionsDisabled;

    public ArrayCursorTrafficCaptureSource(TestContext testContext, ArrayCursorTrafficSourceContext arrayCursorTrafficSourceContext) {
        int i = arrayCursorTrafficSourceContext.nextReadCursor.get();
        log.info("startingCursor = " + i);
        this.readCursor = new AtomicInteger(i);
        this.arrayCursorTrafficSourceContext = arrayCursorTrafficSourceContext;
        this.cursorHighWatermark = Integer.valueOf(i);
        this.rootContext = testContext;
    }

    public CompletableFuture<List<ITrafficStreamWithKey>> readNextTrafficStreamChunk(Supplier<ITrafficSourceContexts.IReadChunkContext> supplier) {
        int andIncrement = this.readCursor.getAndIncrement();
        log.info("reading chunk from index=" + andIncrement);
        if (this.arrayCursorTrafficSourceContext.trafficStreamsList.size() <= andIncrement) {
            return CompletableFuture.failedFuture(new EOFException());
        }
        TrafficStream trafficStream = this.arrayCursorTrafficSourceContext.trafficStreamsList.get(andIncrement);
        TrafficStreamCursorKey trafficStreamCursorKey = new TrafficStreamCursorKey(this.rootContext, trafficStream, andIncrement);
        synchronized (this.pQueue) {
            this.pQueue.add(trafficStreamCursorKey);
            this.cursorHighWatermark = Integer.valueOf(andIncrement);
        }
        return CompletableFuture.supplyAsync(() -> {
            return List.of(new PojoTrafficStreamAndKey(trafficStream, trafficStreamCursorKey));
        });
    }

    public ITrafficCaptureSource.CommitResult commitTrafficStream(ITrafficStreamKey iTrafficStreamKey) {
        synchronized (this.pQueue) {
            log.info("Commit called for " + iTrafficStreamKey + " with pQueue.size=" + this.pQueue.size());
            int i = ((TrafficStreamCursorKey) iTrafficStreamKey).arrayIndex;
            int i2 = this.pQueue.peek().arrayIndex;
            boolean remove = this.pQueue.remove(iTrafficStreamKey);
            if (!remove) {
                log.error("no item " + i + " to remove from " + this.pQueue);
            }
            if (!$assertionsDisabled && !remove) {
                throw new AssertionError();
            }
            if (i2 == i) {
                int intValue = ((Integer) Optional.ofNullable(this.pQueue.peek()).map(trafficStreamCursorKey -> {
                    return Integer.valueOf(trafficStreamCursorKey.getArrayIndex());
                }).orElse(Integer.valueOf(this.cursorHighWatermark.intValue() + 1))).intValue();
                log.info("Commit called for " + iTrafficStreamKey + ", and new topCursor=" + intValue);
                this.arrayCursorTrafficSourceContext.nextReadCursor.set(intValue);
            } else {
                log.info("Commit called for " + iTrafficStreamKey + ", but topCursor=" + i2);
            }
        }
        this.rootContext.channelContextManager.releaseContextFor(((TrafficStreamCursorKey) iTrafficStreamKey).trafficStreamsContext.getChannelKeyContext());
        return ITrafficCaptureSource.CommitResult.Immediate;
    }

    static {
        $assertionsDisabled = !ArrayCursorTrafficCaptureSource.class.desiredAssertionStatus();
        log = LoggerFactory.getLogger(ArrayCursorTrafficCaptureSource.class);
    }
}
