package org.opensearch.migrations.replay;

import io.netty.buffer.ByteBuf;
import io.netty.channel.EventLoop;
import java.time.Duration;
import java.time.Instant;
import java.time.temporal.TemporalAmount;
import java.util.Iterator;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.stream.Stream;
import org.opensearch.migrations.replay.datahandlers.NettyPacketToHttpConsumer;
import org.opensearch.migrations.replay.datatypes.ChannelTask;
import org.opensearch.migrations.replay.datatypes.ChannelTaskType;
import org.opensearch.migrations.replay.datatypes.ConnectionReplaySession;
import org.opensearch.migrations.replay.datatypes.IndexedChannelInteraction;
import org.opensearch.migrations.replay.datatypes.TimeToResponseFulfillmentFutureMap;
import org.opensearch.migrations.replay.datatypes.UniqueReplayerRequestKey;
import org.opensearch.migrations.replay.tracing.IReplayContexts;
import org.opensearch.migrations.replay.util.DiagnosticTrackableCompletableFuture;
import org.opensearch.migrations.replay.util.StringTrackableCompletableFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.event.Level;

/* loaded from: input_file:org/opensearch/migrations/replay/RequestSenderOrchestrator.class */
public class RequestSenderOrchestrator {
    private static final Logger log;
    public final ClientConnectionPool clientConnectionPool;
    static final /* synthetic */ boolean $assertionsDisabled;

    public RequestSenderOrchestrator(ClientConnectionPool clientConnectionPool) {
        this.clientConnectionPool = clientConnectionPool;
    }

    public <T> DiagnosticTrackableCompletableFuture<String, T> scheduleWork(IReplayContexts.IReplayerHttpTransactionContext iReplayerHttpTransactionContext, Instant instant, Supplier<DiagnosticTrackableCompletableFuture<String, T>> supplier) {
        ConnectionReplaySession cachedSession = this.clientConnectionPool.getCachedSession(iReplayerHttpTransactionContext.getChannelKeyContext(), iReplayerHttpTransactionContext.getReplayerRequestKey().sourceRequestIndexSessionIdentifier);
        StringTrackableCompletableFuture stringTrackableCompletableFuture = new StringTrackableCompletableFuture(new CompletableFuture(), () -> {
            return "waiting for final signal to confirm processing work has finished";
        });
        log.atDebug().setMessage(() -> {
            return "Scheduling work for " + iReplayerHttpTransactionContext.getConnectionId() + " at time " + instant;
        }).log();
        IReplayContexts.IScheduledContext createScheduledContext = iReplayerHttpTransactionContext.createScheduledContext(instant);
        cachedSession.eventLoop.schedule(() -> {
            createScheduledContext.close();
            return ((DiagnosticTrackableCompletableFuture) supplier.get()).map(completableFuture -> {
                return completableFuture.whenComplete((obj, th) -> {
                    if (th != null) {
                        stringTrackableCompletableFuture.future.completeExceptionally(th);
                    } else {
                        stringTrackableCompletableFuture.future.complete(obj);
                    }
                });
            }, () -> {
                return "";
            });
        }, getDelayFromNowMs(instant), TimeUnit.MILLISECONDS);
        return stringTrackableCompletableFuture;
    }

    public DiagnosticTrackableCompletableFuture<String, AggregatedRawResponse> scheduleRequest(UniqueReplayerRequestKey uniqueReplayerRequestKey, IReplayContexts.IReplayerHttpTransactionContext iReplayerHttpTransactionContext, Instant instant, Duration duration, Stream<ByteBuf> stream) {
        StringTrackableCompletableFuture stringTrackableCompletableFuture = new StringTrackableCompletableFuture(new CompletableFuture(), () -> {
            return "waiting for final aggregated response";
        });
        return asynchronouslyInvokeRunnable((IReplayContexts.IChannelKeyContext) iReplayerHttpTransactionContext.getLogicalEnclosingScope(), uniqueReplayerRequestKey.sourceRequestIndexSessionIdentifier, uniqueReplayerRequestKey.getReplayerRequestIndex(), false, stringTrackableCompletableFuture, connectionReplaySession -> {
            scheduleSendRequestOnConnectionReplaySession(iReplayerHttpTransactionContext, connectionReplaySession, stringTrackableCompletableFuture, instant, duration, stream);
        });
    }

    public StringTrackableCompletableFuture<Void> scheduleClose(IReplayContexts.IChannelKeyContext iChannelKeyContext, int i, int i2, Instant instant) {
        IndexedChannelInteraction indexedChannelInteraction = new IndexedChannelInteraction(iChannelKeyContext.getChannelKey(), i2);
        StringTrackableCompletableFuture<Void> stringTrackableCompletableFuture = new StringTrackableCompletableFuture<>(new CompletableFuture(), () -> {
            return "waiting for final signal to confirm close has finished";
        });
        log.atDebug().setMessage(() -> {
            return "Scheduling CLOSE for " + indexedChannelInteraction + " at time " + instant;
        }).log();
        asynchronouslyInvokeRunnable(iChannelKeyContext, i, i2, true, stringTrackableCompletableFuture, connectionReplaySession -> {
            scheduleOnConnectionReplaySession(iChannelKeyContext, i2, connectionReplaySession, stringTrackableCompletableFuture, instant, new ChannelTask(ChannelTaskType.CLOSE, () -> {
                log.trace("Closing client connection " + indexedChannelInteraction);
                this.clientConnectionPool.closeConnection(iChannelKeyContext, i);
                stringTrackableCompletableFuture.future.complete(null);
            }));
        });
        return stringTrackableCompletableFuture;
    }

    private <T> DiagnosticTrackableCompletableFuture<String, T> asynchronouslyInvokeRunnable(IReplayContexts.IChannelKeyContext iChannelKeyContext, int i, int i2, boolean z, DiagnosticTrackableCompletableFuture<String, T> diagnosticTrackableCompletableFuture, Consumer<ConnectionReplaySession> consumer) {
        ConnectionReplaySession cachedSession = this.clientConnectionPool.getCachedSession(iChannelKeyContext, i);
        cachedSession.eventLoop.submit(() -> {
            log.atTrace().setMessage(() -> {
                return "adding work item at slot " + i2 + " for " + cachedSession.getChannelKeyContext() + " with " + cachedSession.scheduleSequencer;
            }).log();
            cachedSession.scheduleSequencer.add(i2, () -> {
                consumer.accept(cachedSession);
            }, (v0) -> {
                v0.run();
            });
            log.atLevel(cachedSession.scheduleSequencer.hasPending() ? Level.DEBUG : Level.TRACE).setMessage(() -> {
                return "Sequencer for " + cachedSession.getChannelKeyContext() + " = " + cachedSession.scheduleSequencer;
            }).log();
        });
        return diagnosticTrackableCompletableFuture;
    }

    private void scheduleSendRequestOnConnectionReplaySession(IReplayContexts.IReplayerHttpTransactionContext iReplayerHttpTransactionContext, ConnectionReplaySession connectionReplaySession, StringTrackableCompletableFuture<AggregatedRawResponse> stringTrackableCompletableFuture, Instant instant, Duration duration, Stream<ByteBuf> stream) {
        EventLoop eventLoop = connectionReplaySession.eventLoop;
        IReplayContexts.IScheduledContext createScheduledContext = iReplayerHttpTransactionContext.createScheduledContext(instant);
        scheduleOnConnectionReplaySession((IReplayContexts.IChannelKeyContext) iReplayerHttpTransactionContext.getLogicalEnclosingScope(), iReplayerHttpTransactionContext.getReplayerRequestKey().getSourceRequestIndex(), connectionReplaySession, stringTrackableCompletableFuture, instant, new ChannelTask(ChannelTaskType.TRANSMIT, () -> {
            createScheduledContext.close();
            sendNextPartAndContinue(new NettyPacketToHttpConsumer(connectionReplaySession, iReplayerHttpTransactionContext), eventLoop, stream.iterator(), instant, duration, new AtomicInteger(), stringTrackableCompletableFuture);
        }));
    }

    private <T> void scheduleOnConnectionReplaySession(IReplayContexts.IChannelKeyContext iChannelKeyContext, int i, ConnectionReplaySession connectionReplaySession, StringTrackableCompletableFuture<T> stringTrackableCompletableFuture, Instant instant, ChannelTask channelTask) {
        IndexedChannelInteraction indexedChannelInteraction = new IndexedChannelInteraction(iChannelKeyContext.getChannelKey(), i);
        log.atInfo().setMessage(() -> {
            return indexedChannelInteraction + " scheduling " + channelTask.kind + " at " + instant;
        }).log();
        TimeToResponseFulfillmentFutureMap timeToResponseFulfillmentFutureMap = connectionReplaySession.schedule;
        EventLoop eventLoop = connectionReplaySession.eventLoop;
        if (timeToResponseFulfillmentFutureMap.isEmpty()) {
            eventLoop.schedule(channelTask.runnable, getDelayFromNowMs(instant), TimeUnit.MILLISECONDS).addListener(future -> {
                if (future.isSuccess()) {
                    log.atInfo().setMessage(() -> {
                        return "scheduled task has finished for " + iChannelKeyContext + " interaction: " + indexedChannelInteraction;
                    }).log();
                } else {
                    log.atError().setCause(future.cause()).setMessage(() -> {
                        return "Error running the scheduled task: " + iChannelKeyContext + " interaction: " + indexedChannelInteraction;
                    }).log();
                }
            });
        } else if (!$assertionsDisabled && instant.isBefore(timeToResponseFulfillmentFutureMap.peekFirstItem().getKey())) {
            throw new AssertionError("Per-connection TrafficStream ordering should force a time ordering on incoming requests");
        }
        timeToResponseFulfillmentFutureMap.appendTask(instant, channelTask);
        log.atTrace().setMessage(() -> {
            return indexedChannelInteraction + " added a scheduled event at " + instant + "... " + timeToResponseFulfillmentFutureMap;
        }).log();
        stringTrackableCompletableFuture.map(completableFuture -> {
            return completableFuture.whenComplete((obj, th) -> {
                Instant removeFirstItem = timeToResponseFulfillmentFutureMap.removeFirstItem();
                if (!$assertionsDisabled && !instant.equals(removeFirstItem)) {
                    throw new AssertionError("Expected to have popped the item to match the start time for the responseFuture that finished");
                }
                log.atDebug().setMessage(() -> {
                    return indexedChannelInteraction.toString() + " responseFuture completed - checking " + timeToResponseFulfillmentFutureMap + " for the next item to schedule";
                }).log();
                Optional.ofNullable(timeToResponseFulfillmentFutureMap.peekFirstItem()).ifPresent(entry -> {
                    eventLoop.schedule(((ChannelTask) entry.getValue()).runnable, getDelayFromNowMs((Instant) entry.getKey()), TimeUnit.MILLISECONDS).addListener(future2 -> {
                        if (future2.isSuccess()) {
                            return;
                        }
                        log.atWarn().setCause(future2.cause()).setMessage(() -> {
                            return "Scheduled future did not successfully run " + indexedChannelInteraction;
                        }).log();
                    });
                });
            });
        }, () -> {
            return "";
        });
    }

    private Instant now() {
        return Instant.now();
    }

    private long getDelayFromNowMs(Instant instant) {
        return Math.max(0L, Duration.between(now(), instant).toMillis());
    }

    private void sendNextPartAndContinue(NettyPacketToHttpConsumer nettyPacketToHttpConsumer, EventLoop eventLoop, Iterator<ByteBuf> it, Instant instant, Duration duration, AtomicInteger atomicInteger, StringTrackableCompletableFuture<AggregatedRawResponse> stringTrackableCompletableFuture) {
        int andIncrement = atomicInteger.getAndIncrement();
        log.atTrace().setMessage(() -> {
            return "sendNextPartAndContinue: counter=" + andIncrement;
        }).log();
        if (!$assertionsDisabled && !it.hasNext()) {
            throw new AssertionError("Should not have called this with no items to send");
        }
        nettyPacketToHttpConsumer.consumeBytes(it.next());
        if (it.hasNext()) {
            eventLoop.schedule(() -> {
                sendNextPartAndContinue(nettyPacketToHttpConsumer, eventLoop, it, instant, duration, atomicInteger, stringTrackableCompletableFuture);
            }, Math.min(0L, Duration.between(now(), instant.plus((TemporalAmount) duration.multipliedBy(atomicInteger.get()))).toMillis()), TimeUnit.MILLISECONDS);
        } else {
            nettyPacketToHttpConsumer.finalizeRequest().handle((aggregatedRawResponse, th) -> {
                if (th != null) {
                    stringTrackableCompletableFuture.future.completeExceptionally(th);
                    return null;
                }
                stringTrackableCompletableFuture.future.complete(aggregatedRawResponse);
                return null;
            }, () -> {
                return "waiting for finalize to send Aggregated Response";
            });
        }
    }

    static {
        $assertionsDisabled = !RequestSenderOrchestrator.class.desiredAssertionStatus();
        log = LoggerFactory.getLogger(RequestSenderOrchestrator.class);
    }
}
