package org.opensearch.migrations.replay;

import io.netty.buffer.ByteBuf;
import io.netty.channel.EventLoop;
import io.netty.util.concurrent.ScheduledFuture;
import java.time.Duration;
import java.time.Instant;
import java.time.temporal.TemporalAmount;
import java.util.Iterator;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.Supplier;
import lombok.Generated;
import org.opensearch.migrations.NettyFutureBinders;
import org.opensearch.migrations.replay.datahandlers.IPacketFinalizingConsumer;
import org.opensearch.migrations.replay.datatypes.ByteBufList;
import org.opensearch.migrations.replay.datatypes.ChannelTask;
import org.opensearch.migrations.replay.datatypes.ChannelTaskType;
import org.opensearch.migrations.replay.datatypes.ConnectionReplaySession;
import org.opensearch.migrations.replay.datatypes.IndexedChannelInteraction;
import org.opensearch.migrations.replay.datatypes.TimeToResponseFulfillmentFutureMap;
import org.opensearch.migrations.replay.datatypes.UniqueReplayerRequestKey;
import org.opensearch.migrations.replay.tracing.IReplayContexts;
import org.opensearch.migrations.replay.util.RefSafeHolder;
import org.opensearch.migrations.utils.TextTrackedFuture;
import org.opensearch.migrations.utils.TrackedFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.spi.LoggingEventBuilder;

/* loaded from: input_file:org/opensearch/migrations/replay/RequestSenderOrchestrator.class */
public class RequestSenderOrchestrator {

    @Generated
    private static final Logger log;
    private final ClientConnectionPool clientConnectionPool;
    private final Duration initialRetryDelay;
    private final Duration maxRetryDelay;
    private final BiFunction<ConnectionReplaySession, IReplayContexts.IReplayerHttpTransactionContext, IPacketFinalizingConsumer<AggregatedRawResponse>> packetConsumerFactory;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* loaded from: input_file:org/opensearch/migrations/replay/RequestSenderOrchestrator$DeterminedTransformedResponse.class */
    public static class DeterminedTransformedResponse<T> {
        RetryDirective directive;
        T value;

        @Generated
        public DeterminedTransformedResponse(RetryDirective retryDirective, T t) {
            this.directive = retryDirective;
            this.value = t;
        }
    }

    /* loaded from: input_file:org/opensearch/migrations/replay/RequestSenderOrchestrator$RetryDirective.class */
    public enum RetryDirective {
        DONE,
        RETRY
    }

    /* loaded from: input_file:org/opensearch/migrations/replay/RequestSenderOrchestrator$RetryVisitor.class */
    public interface RetryVisitor<T> {
        TrackedFuture<String, DeterminedTransformedResponse<T>> visit(ByteBuf byteBuf, AggregatedRawResponse aggregatedRawResponse, Throwable th);
    }

    public RequestSenderOrchestrator(ClientConnectionPool clientConnectionPool, BiFunction<ConnectionReplaySession, IReplayContexts.IReplayerHttpTransactionContext, IPacketFinalizingConsumer<AggregatedRawResponse>> biFunction) {
        this(clientConnectionPool, Duration.ofMillis(100L), Duration.ofSeconds(300L), biFunction);
    }

    public RequestSenderOrchestrator(ClientConnectionPool clientConnectionPool, Duration duration, Duration duration2, BiFunction<ConnectionReplaySession, IReplayContexts.IReplayerHttpTransactionContext, IPacketFinalizingConsumer<AggregatedRawResponse>> biFunction) {
        this.clientConnectionPool = clientConnectionPool;
        this.initialRetryDelay = duration;
        this.maxRetryDelay = duration2;
        this.packetConsumerFactory = biFunction;
    }

    public ScheduledFuture<?> scheduleAtFixedRate(Runnable runnable, long j, long j2, TimeUnit timeUnit) {
        return this.clientConnectionPool.scheduleAtFixedRate(runnable, j, j2, timeUnit);
    }

    public <T> TrackedFuture<String, T> scheduleWork(IReplayContexts.IReplayerHttpTransactionContext iReplayerHttpTransactionContext, Instant instant, Supplier<TrackedFuture<String, T>> supplier) {
        ConnectionReplaySession cachedSession = this.clientConnectionPool.getCachedSession(iReplayerHttpTransactionContext.getChannelKeyContext(), iReplayerHttpTransactionContext.getReplayerRequestKey().sourceRequestIndexSessionIdentifier);
        LoggingEventBuilder message = log.atDebug().setMessage("Scheduling work for {} at time {}");
        Objects.requireNonNull(iReplayerHttpTransactionContext);
        message.addArgument(iReplayerHttpTransactionContext::getConnectionId).addArgument(instant).log();
        IReplayContexts.IScheduledContext createScheduledContext = iReplayerHttpTransactionContext.createScheduledContext(instant);
        return bindNettyScheduleToCompletableFuture(cachedSession.eventLoop, instant).getDeferredFutureThroughHandle((r5, th) -> {
            createScheduledContext.close();
            return th == null ? (TrackedFuture) supplier.get() : TextTrackedFuture.failedFuture(th, () -> {
                return "netty scheduling failure";
            });
        }, () -> {
            return "The scheduled callback is running work for " + String.valueOf(iReplayerHttpTransactionContext);
        });
    }

    public <T> TrackedFuture<String, T> scheduleRequest(UniqueReplayerRequestKey uniqueReplayerRequestKey, IReplayContexts.IReplayerHttpTransactionContext iReplayerHttpTransactionContext, Instant instant, Duration duration, ByteBufList byteBufList, RetryVisitor<T> retryVisitor) {
        return submitUnorderedWorkToEventLoop((IReplayContexts.IChannelKeyContext) iReplayerHttpTransactionContext.getLogicalEnclosingScope(), uniqueReplayerRequestKey.sourceRequestIndexSessionIdentifier, uniqueReplayerRequestKey.getReplayerRequestIndex(), connectionReplaySession -> {
            return scheduleSendRequestOnConnectionReplaySession(iReplayerHttpTransactionContext, connectionReplaySession, instant, duration, byteBufList, retryVisitor);
        });
    }

    public TrackedFuture<String, Void> scheduleClose(IReplayContexts.IChannelKeyContext iChannelKeyContext, int i, int i2, Instant instant) {
        IndexedChannelInteraction indexedChannelInteraction = new IndexedChannelInteraction(iChannelKeyContext.getChannelKey(), i2);
        log.atDebug().setMessage("Scheduling CLOSE for {} at time {}").addArgument(indexedChannelInteraction).addArgument(instant).log();
        return submitUnorderedWorkToEventLoop(iChannelKeyContext, i, i2, connectionReplaySession -> {
            return scheduleCloseOnConnectionReplaySession(iChannelKeyContext, connectionReplaySession, instant, i, i2, indexedChannelInteraction);
        });
    }

    private TrackedFuture<String, Void> bindNettyScheduleToCompletableFuture(EventLoop eventLoop, Instant instant) {
        return NettyFutureBinders.bindNettyScheduleToCompletableFuture(eventLoop, getDelayFromNowMs(instant));
    }

    private TextTrackedFuture<Void> bindNettyScheduleToCompletableFuture(EventLoop eventLoop, Instant instant, TrackedFuture<String, Void> trackedFuture) {
        Duration delayFromNowMs = getDelayFromNowMs(instant);
        NettyFutureBinders.bindNettyScheduleToCompletableFuture(eventLoop, delayFromNowMs, trackedFuture.future);
        return new TextTrackedFuture<>(trackedFuture.future, "scheduling to run next send at " + String.valueOf(instant) + " in " + String.valueOf(delayFromNowMs) + "ms");
    }

    private CompletableFuture<Void> bindNettyScheduleToCompletableFuture(EventLoop eventLoop, Instant instant, CompletableFuture<Void> completableFuture) {
        return NettyFutureBinders.bindNettyScheduleToCompletableFuture(eventLoop, getDelayFromNowMs(instant), completableFuture);
    }

    private <T> TrackedFuture<String, T> submitUnorderedWorkToEventLoop(IReplayContexts.IChannelKeyContext iChannelKeyContext, int i, int i2, Function<ConnectionReplaySession, TrackedFuture<String, T>> function) {
        ConnectionReplaySession cachedSession = this.clientConnectionPool.getCachedSession(iChannelKeyContext, i);
        return NettyFutureBinders.bindNettySubmitToTrackableFuture(cachedSession.eventLoop).getDeferredFutureThroughHandle((r8, th) -> {
            LoggingEventBuilder addArgument = log.atTrace().setMessage("adding work item at slot {} for {} with {}").addArgument(Integer.valueOf(i2));
            Objects.requireNonNull(cachedSession);
            addArgument.addArgument(cachedSession::getChannelKeyContext).addArgument(cachedSession.scheduleSequencer).log();
            return cachedSession.scheduleSequencer.addFutureForWork(i2, trackedFuture -> {
                return trackedFuture.thenCompose(r5 -> {
                    return (TrackedFuture) function.apply(cachedSession);
                }, () -> {
                    return "Work callback on replay session";
                });
            });
        }, () -> {
            return "Waiting for sequencer to finish for slot " + i2;
        });
    }

    private <T> TrackedFuture<String, T> scheduleSendRequestOnConnectionReplaySession(IReplayContexts.IReplayerHttpTransactionContext iReplayerHttpTransactionContext, ConnectionReplaySession connectionReplaySession, Instant instant, Duration duration, ByteBufList byteBufList, RetryVisitor<T> retryVisitor) {
        EventLoop eventLoop = connectionReplaySession.eventLoop;
        IReplayContexts.IScheduledContext createScheduledContext = iReplayerHttpTransactionContext.createScheduledContext(instant);
        IndexedChannelInteraction indexedChannelInteraction = new IndexedChannelInteraction(((IReplayContexts.IChannelKeyContext) iReplayerHttpTransactionContext.getLogicalEnclosingScope()).getChannelKey(), iReplayerHttpTransactionContext.getReplayerRequestKey().getSourceRequestIndex());
        byteBufList.retain();
        return scheduleOnConnectionReplaySession(indexedChannelInteraction, connectionReplaySession, instant, new ChannelTask<>(ChannelTaskType.TRANSMIT, trackedFuture -> {
            return trackedFuture.thenCompose(r18 -> {
                createScheduledContext.close();
                return sendRequestWithRetries(() -> {
                    return this.packetConsumerFactory.apply(connectionReplaySession, iReplayerHttpTransactionContext);
                }, eventLoop, byteBufList, instant, this.initialRetryDelay, duration, retryVisitor);
            }, () -> {
                return "sending packets for request";
            });
        })).whenComplete((obj, th) -> {
            byteBufList.release();
        }, () -> {
            return "waiting for request to be sent to release ByteBufList";
        });
    }

    private TrackedFuture<String, Void> scheduleCloseOnConnectionReplaySession(IReplayContexts.IChannelKeyContext iChannelKeyContext, ConnectionReplaySession connectionReplaySession, Instant instant, int i, int i2, IndexedChannelInteraction indexedChannelInteraction) {
        return scheduleOnConnectionReplaySession(new IndexedChannelInteraction(iChannelKeyContext.getChannelKey(), i2), connectionReplaySession, instant, new ChannelTask(ChannelTaskType.CLOSE, trackedFuture -> {
            return trackedFuture.whenComplete((r8, th) -> {
                log.trace("Calling closeConnection at slot " + String.valueOf(indexedChannelInteraction));
                this.clientConnectionPool.closeConnection(iChannelKeyContext, i);
            }, () -> {
                return "Close connection";
            });
        }));
    }

    private <T> TrackedFuture<String, T> scheduleOnConnectionReplaySession(IndexedChannelInteraction indexedChannelInteraction, ConnectionReplaySession connectionReplaySession, Instant instant, ChannelTask<T> channelTask) {
        log.atInfo().setMessage("{} scheduling {} at {}").addArgument(indexedChannelInteraction).addArgument(channelTask.kind).addArgument(instant).log();
        TimeToResponseFulfillmentFutureMap timeToResponseFulfillmentFutureMap = connectionReplaySession.schedule;
        EventLoop eventLoop = connectionReplaySession.eventLoop;
        boolean isEmpty = timeToResponseFulfillmentFutureMap.isEmpty();
        if (!$assertionsDisabled && !isEmpty && instant.isBefore(timeToResponseFulfillmentFutureMap.peekFirstItem().startTime)) {
            throw new AssertionError("Per-connection TrafficStream ordering should force a time ordering on incoming requests");
        }
        TrackedFuture<String, Void> trackedFuture = timeToResponseFulfillmentFutureMap.appendTaskTrigger(instant, channelTask.kind).scheduleFuture;
        TrackedFuture<String, T> trackedFuture2 = (TrackedFuture) channelTask.getRunnable().apply(trackedFuture);
        log.atTrace().setMessage("{} added a scheduled event at {}... {}").addArgument(indexedChannelInteraction).addArgument(instant).addArgument(timeToResponseFulfillmentFutureMap).log();
        if (isEmpty) {
            bindNettyScheduleToCompletableFuture(eventLoop, instant, trackedFuture.future);
        }
        trackedFuture2.map(completableFuture -> {
            return completableFuture.whenComplete((obj, th) -> {
                Instant removeFirstItem = timeToResponseFulfillmentFutureMap.removeFirstItem();
                if (!$assertionsDisabled && !instant.equals(removeFirstItem)) {
                    throw new AssertionError("Expected to have popped the item to match the start time for the responseFuture that finished");
                }
                LoggingEventBuilder message = log.atDebug().setMessage("{} responseFuture completed - checking {} for the next item to schedule");
                Objects.requireNonNull(indexedChannelInteraction);
                message.addArgument(indexedChannelInteraction::toString).addArgument(timeToResponseFulfillmentFutureMap).log();
                Optional.ofNullable(timeToResponseFulfillmentFutureMap.peekFirstItem()).ifPresent(futureWorkPoint -> {
                    bindNettyScheduleToCompletableFuture(eventLoop, futureWorkPoint.startTime, futureWorkPoint.scheduleFuture);
                });
            });
        }, () -> {
            return "";
        });
        return trackedFuture2;
    }

    private Instant now() {
        return Instant.now();
    }

    private Duration getDelayFromNowMs(Instant instant) {
        return Duration.ofMillis(Math.max(0L, Duration.between(now(), instant).toMillis()));
    }

    private Duration doubleRetryDelayCapped(Duration duration) {
        return Duration.ofMillis(Math.min(duration.multipliedBy(2L).toMillis(), this.maxRetryDelay.toMillis()));
    }

    private <T> TrackedFuture<String, T> sendRequestWithRetries(Supplier<IPacketFinalizingConsumer<AggregatedRawResponse>> supplier, EventLoop eventLoop, ByteBufList byteBufList, Instant instant, Duration duration, Duration duration2, RetryVisitor<T> retryVisitor) {
        return eventLoop.isShuttingDown() ? TextTrackedFuture.failedFuture(new IllegalStateException("EventLoop is shutting down"), () -> {
            return "sendRequestWithRetries is failing due to the pending shutdown of the EventLoop";
        }) : sendPackets(supplier.get(), eventLoop, byteBufList.streamUnretained().iterator(), instant, duration2, new AtomicInteger()).getDeferredFutureThroughHandle((aggregatedRawResponse, th) -> {
            RefSafeHolder create = RefSafeHolder.create(byteBufList.asCompositeByteBufRetained());
            try {
                TrackedFuture visit = retryVisitor.visit((ByteBuf) create.get(), aggregatedRawResponse, th);
                if (create != null) {
                    create.close();
                }
                return visit;
            } catch (Throwable th) {
                if (create != null) {
                    try {
                        create.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        }, () -> {
            return "checking response to determine if the request should be retried";
        }).getDeferredFutureThroughHandle((determinedTransformedResponse, th2) -> {
            if (th2 != null) {
                return TextTrackedFuture.failedFuture(th2, () -> {
                    return "failed future";
                });
            }
            if (determinedTransformedResponse.directive != RetryDirective.RETRY) {
                return TextTrackedFuture.completedFuture(determinedTransformedResponse.value, () -> {
                    return "done retrying and returning received response";
                });
            }
            Instant plus = instant.plus((TemporalAmount) duration);
            log.atInfo().setMessage("Making request scheduled at {}").addArgument(plus).log();
            Duration between = Duration.between(now(), plus);
            return NettyFutureBinders.bindNettyScheduleToCompletableFuture(eventLoop, between).thenCompose(r17 -> {
                return sendRequestWithRetries(supplier, eventLoop, byteBufList, plus, doubleRetryDelayCapped(duration), duration2, retryVisitor);
            }, () -> {
                return "retrying request with delay of " + String.valueOf(between);
            });
        }, () -> {
            return "determining if the response must be retried or if it should be returned now";
        });
    }

    private TrackedFuture<String, AggregatedRawResponse> sendPackets(IPacketFinalizingConsumer<AggregatedRawResponse> iPacketFinalizingConsumer, EventLoop eventLoop, Iterator<ByteBuf> it, Instant instant, Duration duration, AtomicInteger atomicInteger) {
        log.atTrace().setMessage("sendNextPartAndContinue: packetCounter={}").addArgument(Integer.valueOf(atomicInteger.getAndIncrement())).log();
        if (!$assertionsDisabled && !it.hasNext()) {
            throw new AssertionError("Should not have called this with no items to send");
        }
        TrackedFuture<String, Void> consumeBytes = iPacketFinalizingConsumer.consumeBytes(it.next().retainedDuplicate());
        return it.hasNext() ? consumeBytes.thenCompose(r16 -> {
            return NettyFutureBinders.bindNettyScheduleToCompletableFuture(eventLoop, Duration.between(now(), instant.plus((TemporalAmount) duration.multipliedBy(atomicInteger.get())))).thenCompose(r15 -> {
                return sendPackets(iPacketFinalizingConsumer, eventLoop, it, instant, duration, atomicInteger);
            }, () -> {
                return "sending next packet";
            });
        }, () -> {
            return "recursing, once ready";
        }) : consumeBytes.getDeferredFutureThroughHandle((r3, th) -> {
            return iPacketFinalizingConsumer.finalizeRequest();
        }, () -> {
            return "finalizing, once ready";
        });
    }

    static {
        $assertionsDisabled = !RequestSenderOrchestrator.class.desiredAssertionStatus();
        log = LoggerFactory.getLogger(RequestSenderOrchestrator.class);
    }
}
