package org.opensearch.migrations.replay;

import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
import java.net.URI;
import java.time.Duration;
import java.util.Arrays;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import lombok.NonNull;
import org.opensearch.migrations.replay.TrafficReplayer;
import org.opensearch.migrations.replay.TrafficReplayerCore;
import org.opensearch.migrations.replay.datahandlers.NettyPacketToHttpConsumer;
import org.opensearch.migrations.replay.datatypes.UniqueReplayerRequestKey;
import org.opensearch.migrations.replay.http.retries.OpenSearchDefaultRetry;
import org.opensearch.migrations.replay.http.retries.RetryCollectingVisitorFactory;
import org.opensearch.migrations.replay.tracing.IRootReplayerContext;
import org.opensearch.migrations.replay.traffic.source.BlockingTrafficSource;
import org.opensearch.migrations.replay.traffic.source.TrafficStreamLimiter;
import org.opensearch.migrations.transform.IAuthTransformerFactory;
import org.opensearch.migrations.transform.IJsonTransformer;
import org.opensearch.migrations.utils.TextTrackedFuture;
import org.opensearch.migrations.utils.TrackedFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.event.Level;
import org.slf4j.spi.LoggingEventBuilder;

/* loaded from: input_file:org/opensearch/migrations/replay/TrafficReplayerTopLevel.class */
public class TrafficReplayerTopLevel extends TrafficReplayerCore implements AutoCloseable {
    private static final Logger log;
    public static final String TARGET_CONNECTION_POOL_NAME = "targetConnectionPool";
    public static final int MAX_ITEMS_TO_SHOW_FOR_LEFTOVER_WORK_AT_INFO_LEVEL = 10;
    public static final AtomicInteger targetConnectionPoolUniqueCounter;
    private final AtomicReference<TextTrackedFuture<Void>> allRemainingWorkFutureOrShutdownSignalRef;
    protected final ClientConnectionPool clientConnectionPool;
    private final AtomicReference<Error> shutdownReasonRef;
    private final AtomicReference<CompletableFuture<Void>> shutdownFutureRef;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* loaded from: input_file:org/opensearch/migrations/replay/TrafficReplayerTopLevel$ConcurrentHashMapWorkTracker.class */
    static class ConcurrentHashMapWorkTracker<T> implements IStreamableWorkTracker<T> {
        ConcurrentHashMap<UniqueReplayerRequestKey, TrackedFuture<String, T>> map = new ConcurrentHashMap<>();

        ConcurrentHashMapWorkTracker() {
        }

        @Override // org.opensearch.migrations.replay.TrafficReplayerCore.IWorkTracker
        public void put(UniqueReplayerRequestKey uniqueReplayerRequestKey, TrackedFuture<String, T> trackedFuture) {
            this.map.put(uniqueReplayerRequestKey, trackedFuture);
        }

        @Override // org.opensearch.migrations.replay.TrafficReplayerCore.IWorkTracker
        public void remove(UniqueReplayerRequestKey uniqueReplayerRequestKey) {
            this.map.remove(uniqueReplayerRequestKey);
        }

        @Override // org.opensearch.migrations.replay.TrafficReplayerCore.IWorkTracker
        public boolean isEmpty() {
            return this.map.isEmpty();
        }

        @Override // org.opensearch.migrations.replay.TrafficReplayerCore.IWorkTracker
        public int size() {
            return this.map.size();
        }

        @Override // org.opensearch.migrations.replay.TrafficReplayerTopLevel.IStreamableWorkTracker
        public Stream<Map.Entry<UniqueReplayerRequestKey, TrackedFuture<String, T>>> getRemainingItems() {
            return this.map.entrySet().stream();
        }
    }

    /* loaded from: input_file:org/opensearch/migrations/replay/TrafficReplayerTopLevel$IStreamableWorkTracker.class */
    public interface IStreamableWorkTracker<T> extends TrafficReplayerCore.IWorkTracker<T> {
        Stream<Map.Entry<UniqueReplayerRequestKey, TrackedFuture<String, T>>> getRemainingItems();
    }

    public TrafficReplayerTopLevel(IRootReplayerContext iRootReplayerContext, URI uri, IAuthTransformerFactory iAuthTransformerFactory, IJsonTransformer iJsonTransformer, ClientConnectionPool clientConnectionPool, TrafficStreamLimiter trafficStreamLimiter, IStreamableWorkTracker<Void> iStreamableWorkTracker) {
        super(iRootReplayerContext, uri, iAuthTransformerFactory, iJsonTransformer, trafficStreamLimiter, iStreamableWorkTracker, new RetryCollectingVisitorFactory(new OpenSearchDefaultRetry()));
        this.clientConnectionPool = clientConnectionPool;
        this.allRemainingWorkFutureOrShutdownSignalRef = new AtomicReference<>();
        this.shutdownReasonRef = new AtomicReference<>();
        this.shutdownFutureRef = new AtomicReference<>();
    }

    public static ClientConnectionPool makeNettyPacketConsumerConnectionPool(URI uri, boolean z, int i) {
        return makeNettyPacketConsumerConnectionPool(uri, z, i, null);
    }

    public static ClientConnectionPool makeNettyPacketConsumerConnectionPool(URI uri, boolean z, int i, String str) {
        return new ClientConnectionPool(NettyPacketToHttpConsumer.createClientConnectionFactory(loadSslContext(uri, z), uri), str != null ? str : getTargetConnectionPoolName(targetConnectionPoolUniqueCounter.getAndIncrement()), i);
    }

    public static String getTargetConnectionPoolName(int i) {
        return "targetConnectionPool" + (i == 0 ? "" : Integer.toString(i));
    }

    public static SslContext loadSslContext(URI uri, boolean z) {
        if (!uri.getScheme().equalsIgnoreCase("https")) {
            return null;
        }
        SslContextBuilder forClient = SslContextBuilder.forClient();
        if (z) {
            forClient.trustManager(InsecureTrustManagerFactory.INSTANCE);
        }
        return forClient.build();
    }

    public void setupRunAndWaitForReplayToFinish(Duration duration, Duration duration2, BlockingTrafficSource blockingTrafficSource, TimeShifter timeShifter, Consumer<SourceTargetCaptureTuple> consumer) throws InterruptedException, ExecutionException {
        ReplayEngine replayEngine = new ReplayEngine(new RequestSenderOrchestrator(this.clientConnectionPool, (connectionReplaySession, iReplayerHttpTransactionContext) -> {
            return new NettyPacketToHttpConsumer(connectionReplaySession, iReplayerHttpTransactionContext, duration2);
        }), blockingTrafficSource, timeShifter);
        CapturedTrafficToHttpTransactionAccumulator capturedTrafficToHttpTransactionAccumulator = new CapturedTrafficToHttpTransactionAccumulator(duration, "(see command line option --packet-timeout-seconds)", new TrafficReplayerCore.TrafficReplayerAccumulationCallbacks(replayEngine, consumer, blockingTrafficSource));
        try {
            try {
                pullCaptureFromSourceToAccumulator(blockingTrafficSource, capturedTrafficToHttpTransactionAccumulator);
                capturedTrafficToHttpTransactionAccumulator.close();
                wrapUpWorkAndEmitSummary(replayEngine, capturedTrafficToHttpTransactionAccumulator);
                if (!$assertionsDisabled && this.shutdownFutureRef.get() == null && !this.requestWorkTracker.isEmpty()) {
                    throw new AssertionError("expected to wait for all the in flight requests to fully flush and self destruct themselves");
                }
            } catch (InterruptedException e) {
                throw e;
            } catch (Exception e2) {
                log.atWarn().setCause(e2).setMessage("Terminating runReplay due to exception").log();
                throw e2;
            }
        } catch (Throwable th) {
            capturedTrafficToHttpTransactionAccumulator.close();
            wrapUpWorkAndEmitSummary(replayEngine, capturedTrafficToHttpTransactionAccumulator);
            if (!$assertionsDisabled && this.shutdownFutureRef.get() == null && !this.requestWorkTracker.isEmpty()) {
                throw new AssertionError("expected to wait for all the in flight requests to fully flush and self destruct themselves");
            }
            throw th;
        }
    }

    protected void wrapUpWorkAndEmitSummary(ReplayEngine replayEngine, CapturedTrafficToHttpTransactionAccumulator capturedTrafficToHttpTransactionAccumulator) throws ExecutionException, InterruptedException {
        Level level = Level.INFO;
        Level level2 = Level.WARN;
        Level level3 = level;
        Duration ofSeconds = Duration.ofSeconds(60L);
        while (true) {
            Duration duration = ofSeconds;
            if (this.shutdownFutureRef.get() != null) {
                log.warn("Not waiting for work because the TrafficReplayer is shutting down.");
                break;
            }
            try {
                waitForRemainingWork(level3, duration);
                break;
            } catch (TimeoutException e) {
                log.atLevel(level3).log("Timed out while waiting for the remaining requests to be finalized...");
                level3 = level2;
                ofSeconds = duration.multipliedBy(2L);
            }
        }
        if (!this.requestWorkTracker.isEmpty() || this.exceptionRequestCount.get() > 0) {
            LoggingEventBuilder message = log.atWarn().setMessage("{} in-flight requests being dropped due to pending shutdown; {} requests to the target threw an exception; {} requests were successfully processed.");
            TrafficReplayerCore.IWorkTracker<Void> iWorkTracker = this.requestWorkTracker;
            Objects.requireNonNull(iWorkTracker);
            LoggingEventBuilder addArgument = message.addArgument(iWorkTracker::size);
            AtomicInteger atomicInteger = this.exceptionRequestCount;
            Objects.requireNonNull(atomicInteger);
            LoggingEventBuilder addArgument2 = addArgument.addArgument(atomicInteger::get);
            AtomicInteger atomicInteger2 = this.successfulRequestCount;
            Objects.requireNonNull(atomicInteger2);
            addArgument2.addArgument(atomicInteger2::get).log();
        } else {
            log.info(this.successfulRequestCount.get() + " requests were successfully processed.");
        }
        log.info("# of connections created: {}; # of requests on reused keep-alive connections: {}; # of expired connections: {}; # of connections closed: {}; # of connections terminated upon accumulator termination: {}", new Object[]{Integer.valueOf(capturedTrafficToHttpTransactionAccumulator.numberOfConnectionsCreated()), Integer.valueOf(capturedTrafficToHttpTransactionAccumulator.numberOfRequestsOnReusedConnections()), Integer.valueOf(capturedTrafficToHttpTransactionAccumulator.numberOfConnectionsExpired()), Integer.valueOf(capturedTrafficToHttpTransactionAccumulator.numberOfConnectionsClosed()), Integer.valueOf(capturedTrafficToHttpTransactionAccumulator.numberOfRequestsTerminatedUponAccumulatorClose())});
    }

    public void setupRunAndWaitForReplayWithShutdownChecks(Duration duration, Duration duration2, BlockingTrafficSource blockingTrafficSource, TimeShifter timeShifter, Consumer<SourceTargetCaptureTuple> consumer) throws TrafficReplayer.TerminationException, ExecutionException, InterruptedException {
        try {
            setupRunAndWaitForReplayToFinish(duration, duration2, blockingTrafficSource, timeShifter, consumer);
            if (this.shutdownReasonRef.get() != null) {
                throw new TrafficReplayer.TerminationException(this.shutdownReasonRef.get(), null);
            }
            shutdown(null).get();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new TrafficReplayer.TerminationException(this.shutdownReasonRef.get(), e);
        } catch (Throwable th) {
            throw new TrafficReplayer.TerminationException(this.shutdownReasonRef.get(), th);
        }
    }

    protected void waitForRemainingWork(Level level, @NonNull Duration duration) throws ExecutionException, InterruptedException, TimeoutException {
        if (duration == null) {
            throw new NullPointerException("timeout is marked non-null but is null");
        }
        if (!this.liveTrafficStreamLimiter.isStopped()) {
            CompletableFuture completableFuture = new CompletableFuture();
            this.liveTrafficStreamLimiter.queueWork(1, null, workItem -> {
                completableFuture.complete(null);
                this.liveTrafficStreamLimiter.doneProcessing(workItem);
            });
            completableFuture.get(duration.toMillis(), TimeUnit.MILLISECONDS);
        }
        Map.Entry[] entryArr = (Map.Entry[]) ((IStreamableWorkTracker) this.requestWorkTracker).getRemainingItems().toArray(i -> {
            return new Map.Entry[i];
        });
        writeStatusLogsForRemainingWork(level, entryArr);
        TextTrackedFuture<Void> allOf = TextTrackedFuture.allOf((TrackedFuture[]) Arrays.stream(entryArr).map((v0) -> {
            return v0.getValue();
        }).toArray(i2 -> {
            return new TrackedFuture[i2];
        }), () -> {
            return "TrafficReplayer.AllWorkFinished";
        });
        try {
            try {
                if (this.allRemainingWorkFutureOrShutdownSignalRef.compareAndSet(null, allOf)) {
                    allOf.get(duration);
                } else {
                    handleAlreadySetFinishedSignal();
                }
                this.allRemainingWorkFutureOrShutdownSignalRef.set(null);
            } catch (TimeoutException e) {
                if (allOf.future.cancel(true)) {
                    throw e;
                }
                if (!$assertionsDisabled && !allOf.future.isDone()) {
                    throw new AssertionError("expected future to have finished if cancel didn't succeed");
                }
                this.allRemainingWorkFutureOrShutdownSignalRef.set(null);
            }
        } catch (Throwable th) {
            this.allRemainingWorkFutureOrShutdownSignalRef.set(null);
            throw th;
        }
    }

    private void handleAlreadySetFinishedSignal() throws InterruptedException, ExecutionException {
        try {
            CompletableFuture completableFuture = this.allRemainingWorkFutureOrShutdownSignalRef.get().future;
            if (!$assertionsDisabled && !completableFuture.isDone()) {
                throw new AssertionError("Expected this reference to be EITHER the current work futures or a sentinel value indicating a shutdown has commenced.  The signal, when set, should have been completed at the time that the reference was set");
            }
            completableFuture.get();
            log.debug("Did shutdown cleanly");
        } catch (Error e) {
            log.atError().setCause(e).setMessage("Not waiting for all work to finish.  The TrafficReplayer is shutting down").log();
            throw e;
        } catch (ExecutionException e2) {
            Throwable cause = e2.getCause();
            if (!(cause instanceof Error)) {
                throw e2;
            }
            throw ((Error) cause);
        }
    }

    protected static void writeStatusLogsForRemainingWork(Level level, Map.Entry<UniqueReplayerRequestKey, TrackedFuture<String, TransformedTargetRequestAndResponseList>>[] entryArr) {
        log.atLevel(level).setMessage("All remaining work to wait on {}").addArgument(Integer.valueOf(entryArr.length)).log();
        if (log.isInfoEnabled()) {
            LoggingEventBuilder atTrace = log.isTraceEnabled() ? log.atTrace() : log.atInfo();
            long j = log.isTraceEnabled() ? Long.MAX_VALUE : 10L;
            atTrace.setMessage(" items: {}").addArgument(() -> {
                return Arrays.stream(entryArr).map(entry -> {
                    return entry.getKey() + " --> " + ((TrackedFuture) entry.getValue()).formatAsString(TrafficReplayerTopLevel::formatWorkItem);
                }).limit(j).collect(Collectors.joining(HttpByteBufFormatter.LF_LINE_DELIMITER));
            }).log();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static String formatWorkItem(TrackedFuture<String, ?> trackedFuture) {
        try {
            Object obj = trackedFuture.get();
            if (obj instanceof TransformedTargetRequestAndResponseList) {
                return ((TransformedTargetRequestAndResponseList) obj).getTransformationStatus();
            }
            return null;
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            return "Exception: " + e.getMessage();
        } catch (ExecutionException e2) {
            return e2.getMessage();
        }
    }

    @Override // org.opensearch.migrations.replay.RequestTransformerAndSender
    protected boolean shouldRetry() {
        return !this.stopReadingRef.get();
    }

    @Override // org.opensearch.migrations.replay.TrafficReplayerCore
    @NonNull
    public CompletableFuture<Void> shutdown(Error error) {
        log.atWarn().setCause(error).setMessage("Shutting down {}").addArgument(this).log();
        this.shutdownReasonRef.compareAndSet(null, error);
        if (!this.shutdownFutureRef.compareAndSet(null, new CompletableFuture<>())) {
            LoggingEventBuilder message = log.atError().setMessage("Shutdown was already signaled by {}.  Ignoring this shutdown request due to {}.");
            AtomicReference<Error> atomicReference = this.shutdownReasonRef;
            Objects.requireNonNull(atomicReference);
            message.addArgument(atomicReference::get).addArgument(error).log();
            return this.shutdownFutureRef.get();
        }
        this.stopReadingRef.set(true);
        this.liveTrafficStreamLimiter.close();
        this.clientConnectionPool.shutdownNow().whenComplete((r4, th) -> {
            if (th != null) {
                this.shutdownFutureRef.get().completeExceptionally(th);
            } else {
                this.shutdownFutureRef.get().complete(null);
            }
        });
        Optional.ofNullable(this.nextChunkFutureRef.get()).ifPresent(completableFuture -> {
            completableFuture.cancel(true);
        });
        TextTrackedFuture<Void> completedFuture = error == null ? TextTrackedFuture.completedFuture((Object) null, () -> {
            return "TrafficReplayer shutdown";
        }) : TextTrackedFuture.failedFuture(error, () -> {
            return "TrafficReplayer shutdown";
        });
        while (true) {
            if (this.allRemainingWorkFutureOrShutdownSignalRef.compareAndSet(null, completedFuture)) {
                break;
            }
            TextTrackedFuture<Void> textTrackedFuture = this.allRemainingWorkFutureOrShutdownSignalRef.get();
            if (textTrackedFuture != null) {
                textTrackedFuture.future.cancel(true);
                break;
            }
        }
        CompletableFuture<Void> completableFuture2 = this.shutdownFutureRef.get();
        log.atWarn().setMessage("Shutdown setup has been initiated").log();
        return completableFuture2;
    }

    @Override // java.lang.AutoCloseable
    public void close() throws Exception {
        shutdown(null).get();
    }

    static {
        $assertionsDisabled = !TrafficReplayerTopLevel.class.desiredAssertionStatus();
        log = LoggerFactory.getLogger(TrafficReplayerTopLevel.class);
        targetConnectionPoolUniqueCounter = new AtomicInteger();
    }
}
