package org.opensearch.migrations.replay;

import io.netty.buffer.Unpooled;
import java.time.Instant;
import java.util.Objects;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.stream.Stream;
import lombok.NonNull;
import org.opensearch.migrations.replay.RequestSenderOrchestrator;
import org.opensearch.migrations.replay.datahandlers.IPacketFinalizingConsumer;
import org.opensearch.migrations.replay.datatypes.ByteBufList;
import org.opensearch.migrations.replay.datatypes.HttpRequestTransformationStatus;
import org.opensearch.migrations.replay.datatypes.TransformedOutputAndResult;
import org.opensearch.migrations.replay.http.retries.IRetryVisitorFactory;
import org.opensearch.migrations.replay.tracing.IReplayContexts;
import org.opensearch.migrations.replay.util.TextTrackedFuture;
import org.opensearch.migrations.replay.util.TrackedFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.spi.LoggingEventBuilder;

/* loaded from: input_file:org/opensearch/migrations/replay/RequestTransformerAndSender.class */
public class RequestTransformerAndSender<T> {
    private static final Logger log;
    protected final IRetryVisitorFactory<T> retryVisitorFactory;
    static final /* synthetic */ boolean $assertionsDisabled;

    RequestSenderOrchestrator.RetryVisitor<T> getRetryCheckVisitor(TransformedOutputAndResult<ByteBufList> transformedOutputAndResult, TrackedFuture<String, ? extends IRequestResponsePacketPair> trackedFuture, Consumer<AggregatedRawResponse> consumer) {
        RequestSenderOrchestrator.RetryVisitor<T> retryCheckVisitor = this.retryVisitorFactory.getRetryCheckVisitor(transformedOutputAndResult, trackedFuture);
        return (byteBuf, aggregatedRawResponse, th) -> {
            consumer.accept(aggregatedRawResponse);
            if (!shouldRetry()) {
                return TextTrackedFuture.completedFuture(new RequestSenderOrchestrator.DeterminedTransformedResponse(RequestSenderOrchestrator.RetryDirective.DONE, null), () -> {
                    return "Returning a future to NOT retry because the class is currently prohibiting retries";
                });
            }
            if (th != null) {
                return TextTrackedFuture.completedFuture(new RequestSenderOrchestrator.DeterminedTransformedResponse(RequestSenderOrchestrator.RetryDirective.RETRY, null), () -> {
                    return "Returning a future to retry due to a connection exception";
                });
            }
            if ($assertionsDisabled || aggregatedRawResponse != null) {
                return retryCheckVisitor.visit(byteBuf, aggregatedRawResponse, th);
            }
            throw new AssertionError();
        };
    }

    protected boolean shouldRetry() {
        return true;
    }

    protected void perResponseConsumer(AggregatedRawResponse aggregatedRawResponse, HttpRequestTransformationStatus httpRequestTransformationStatus, IReplayContexts.IReplayerHttpTransactionContext iReplayerHttpTransactionContext) {
    }

    public TrackedFuture<String, T> transformAndSendRequest(PacketToTransformingHttpHandlerFactory packetToTransformingHttpHandlerFactory, ReplayEngine replayEngine, TrackedFuture<String, RequestResponsePacketPair> trackedFuture, IReplayContexts.IReplayerHttpTransactionContext iReplayerHttpTransactionContext, @NonNull Instant instant, @NonNull Instant instant2, Supplier<Stream<byte[]>> supplier) {
        if (instant == null) {
            throw new NullPointerException("start is marked non-null but is null");
        }
        if (instant2 == null) {
            throw new NullPointerException("end is marked non-null but is null");
        }
        try {
            TrackedFuture<String, T> scheduleTransformationWork = replayEngine.scheduleTransformationWork(iReplayerHttpTransactionContext, instant, () -> {
                return transformAllData(packetToTransformingHttpHandlerFactory.create(iReplayerHttpTransactionContext), supplier);
            });
            log.atDebug().setMessage("request transform future for {} = {}").addArgument(iReplayerHttpTransactionContext).addArgument(scheduleTransformationWork).log();
            return (TrackedFuture<String, T>) scheduleTransformationWork.thenCompose(transformedOutputAndResult -> {
                return replayEngine.scheduleRequest(iReplayerHttpTransactionContext, instant, instant2, ((ByteBufList) transformedOutputAndResult.transformedOutput).size(), (ByteBufList) transformedOutputAndResult.transformedOutput, getRetryCheckVisitor(transformedOutputAndResult, trackedFuture, aggregatedRawResponse -> {
                    perResponseConsumer(aggregatedRawResponse, transformedOutputAndResult.transformationStatus, iReplayerHttpTransactionContext);
                }));
            }, () -> {
                return "transitioning transformed packets onto the wire";
            });
        } catch (Exception e) {
            log.debug("Caught exception in transformAndSendRequest, so failing future");
            return TextTrackedFuture.failedFuture(e, () -> {
                return "TrafficReplayer.writeToSocketAndClose";
            });
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static <R> TrackedFuture<String, R> transformAllData(IPacketFinalizingConsumer<R> iPacketFinalizingConsumer, Supplier<Stream<byte[]>> supplier) {
        try {
            String simpleName = iPacketFinalizingConsumer.getClass().getSimpleName();
            supplier.get().map(Unpooled::wrappedBuffer).forEach(byteBuf -> {
                LoggingEventBuilder addArgument = log.atDebug().setMessage("{} sending {} bytes to the packetHandler").addArgument(simpleName);
                Objects.requireNonNull(byteBuf);
                addArgument.addArgument(byteBuf::readableBytes).log();
                log.atDebug().setMessage("{} consumeFuture = {}").addArgument(simpleName).addArgument(iPacketFinalizingConsumer.consumeBytes(byteBuf)).log();
            });
            log.atDebug().setMessage("{}  done sending bytes, now finalizing the request").addArgument(simpleName).log();
            return iPacketFinalizingConsumer.finalizeRequest();
        } catch (Exception e) {
            log.atInfo().setCause(e).setMessage("Encountered an exception while transforming the http request.  The base64 gzipped traffic stream, for later diagnostic purposes, is: {}").addArgument(() -> {
                return Utils.packetsToCompressedTrafficStream((Stream) supplier.get());
            }).log();
            throw e;
        }
    }

    public RequestTransformerAndSender(IRetryVisitorFactory<T> iRetryVisitorFactory) {
        this.retryVisitorFactory = iRetryVisitorFactory;
    }

    static {
        $assertionsDisabled = !RequestTransformerAndSender.class.desiredAssertionStatus();
        log = LoggerFactory.getLogger(RequestTransformerAndSender.class);
    }
}
