package org.opensearch.migrations.replay.datahandlers;

import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoop;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.http.HttpResponseDecoder;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslHandler;
import io.netty.handler.timeout.ReadTimeoutHandler;
import io.netty.util.concurrent.Future;
import java.io.IOException;
import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.time.Instant;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.BiFunction;
import java.util.function.Supplier;
import javax.net.ssl.SSLEngine;
import lombok.NonNull;
import org.opensearch.migrations.NettyFutureBinders;
import org.opensearch.migrations.replay.AggregatedRawResponse;
import org.opensearch.migrations.replay.datahandlers.http.helpers.ReadMeteringHandler;
import org.opensearch.migrations.replay.datahandlers.http.helpers.WriteMeteringHandler;
import org.opensearch.migrations.replay.datatypes.ConnectionReplaySession;
import org.opensearch.migrations.replay.netty.BacksideHttpWatcherHandler;
import org.opensearch.migrations.replay.netty.BacksideSnifferHandler;
import org.opensearch.migrations.replay.tracing.IReplayContexts;
import org.opensearch.migrations.replay.util.TextTrackedFuture;
import org.opensearch.migrations.replay.util.TrackedFuture;
import org.opensearch.migrations.tracing.IScopedInstrumentationAttributes;
import org.opensearch.migrations.tracing.IWithTypedEnclosingScope;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/opensearch/migrations/replay/datahandlers/NettyPacketToHttpConsumer.class */
public class NettyPacketToHttpConsumer implements IPacketFinalizingConsumer<AggregatedRawResponse> {
    private static final Logger log;
    private static final Optional<LogLevel> PIPELINE_LOGGING_OPTIONAL;
    private static final Duration MAX_WAIT_BETWEEN_CREATE_RETRIES;
    public static final String BACKSIDE_HTTP_WATCHER_HANDLER_NAME = "BACKSIDE_HTTP_WATCHER_HANDLER";
    public static final String CONNECTION_CLOSE_HANDLER_NAME = "CONNECTION_CLOSE_HANDLER";
    public static final String SSL_HANDLER_NAME = "ssl";
    public static final String READ_TIMEOUT_HANDLER_NAME = "readTimeoutHandler";
    public static final String WRITE_COUNT_WATCHER_HANDLER_NAME = "writeCountWatcher";
    public static final String READ_COUNT_WATCHER_HANDLER_NAME = "readCountWatcher";
    TrackedFuture<String, Void> activeChannelFuture;
    ConnectionReplaySession replaySession;
    private Channel channel;
    AggregatedRawResponse.Builder responseBuilder;
    IWithTypedEnclosingScope<IReplayContexts.ITargetRequestContext> currentRequestContextUnion;
    Duration readTimeoutDuration;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* loaded from: input_file:org/opensearch/migrations/replay/datahandlers/NettyPacketToHttpConsumer$ChannelNotActiveException.class */
    public static class ChannelNotActiveException extends IOException {
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/opensearch/migrations/replay/datahandlers/NettyPacketToHttpConsumer$ConnectionClosedListenerHandler.class */
    public static class ConnectionClosedListenerHandler extends ChannelInboundHandlerAdapter {
        private final IReplayContexts.ISocketContext socketContext;

        ConnectionClosedListenerHandler(IReplayContexts.IChannelKeyContext iChannelKeyContext) {
            this.socketContext = iChannelKeyContext.createSocketContext();
        }

        public void channelInactive(ChannelHandlerContext channelHandlerContext) throws Exception {
            this.socketContext.close();
            super.channelInactive(channelHandlerContext);
        }

        public void exceptionCaught(ChannelHandlerContext channelHandlerContext, Throwable th) throws Exception {
            this.socketContext.addTraceException(th, true);
            NettyPacketToHttpConsumer.log.atDebug().setMessage("Exception caught in ConnectionClosedListenerHandler.Closing channel due to exception").setCause(th).log();
            channelHandlerContext.close();
            super.exceptionCaught(channelHandlerContext, th);
        }
    }

    public NettyPacketToHttpConsumer(ConnectionReplaySession connectionReplaySession, IReplayContexts.IReplayerHttpTransactionContext iReplayerHttpTransactionContext, Duration duration) {
        this.replaySession = connectionReplaySession;
        setCurrentMessageContext(iReplayerHttpTransactionContext.createTargetRequestContext().createHttpSendingContext());
        this.responseBuilder = AggregatedRawResponse.builder(Instant.now());
        log.atDebug().setMessage(() -> {
            return "C'tor: incoming session=" + connectionReplaySession;
        }).log();
        this.activeChannelFuture = activateLiveChannel();
        this.readTimeoutDuration = duration;
    }

    private TrackedFuture<String, Void> activateLiveChannel() {
        IReplayContexts.IChannelKeyContext channelKeyContext = this.replaySession.getChannelKeyContext();
        return this.replaySession.getChannelFutureInActiveState(getParentContext()).thenCompose(channelFuture -> {
            return NettyFutureBinders.bindNettyFutureToTrackableFuture((Future<?>) channelFuture, "waiting for newly acquired channel to be ready").getDeferredFutureThroughHandle((r7, th) -> {
                if (th != null) {
                    channelKeyContext.addFailedChannelCreation();
                    channelKeyContext.addTraceException(channelFuture.cause(), true);
                    log.atWarn().setMessage(() -> {
                        return "error creating channel, not retrying";
                    }).setCause(th).log();
                    throw th;
                }
                Channel channel = channelFuture.channel();
                if (!channel.isActive()) {
                    channelKeyContext.addFailedChannelCreation();
                    log.atWarn().setMessage(() -> {
                        return "Channel wasn't active, trying to create another for this request";
                    }).log();
                    return activateLiveChannel();
                }
                this.channel = channel;
                initializeRequestHandlers();
                log.atDebug().setMessage(() -> {
                    return "Channel initialized for " + channelKeyContext + " signaling future";
                }).log();
                return TextTrackedFuture.completedFuture(null, () -> {
                    return "Done";
                });
            }, () -> {
                return "acting on ready channelFuture to retry if inactive or to return";
            });
        }, () -> {
            return "taking newly acquired channel and making it active";
        });
    }

    private <T extends IWithTypedEnclosingScope<IReplayContexts.ITargetRequestContext> & IScopedInstrumentationAttributes> void setCurrentMessageContext(T t) {
        this.currentRequestContextUnion = t;
    }

    private IScopedInstrumentationAttributes getCurrentRequestSpan() {
        return this.currentRequestContextUnion;
    }

    public IReplayContexts.ITargetRequestContext getParentContext() {
        return (IReplayContexts.ITargetRequestContext) this.currentRequestContextUnion.getLogicalEnclosingScope();
    }

    public static BiFunction<EventLoop, IReplayContexts.ITargetRequestContext, TrackedFuture<String, ChannelFuture>> createClientConnectionFactory(SslContext sslContext, URI uri) {
        return (eventLoop, iTargetRequestContext) -> {
            return createClientConnection(eventLoop, sslContext, uri, iTargetRequestContext);
        };
    }

    public static TrackedFuture<String, ChannelFuture> createClientConnection(EventLoop eventLoop, SslContext sslContext, URI uri, IReplayContexts.ITargetRequestContext iTargetRequestContext) {
        return createClientConnection(eventLoop, sslContext, uri, iTargetRequestContext, Duration.ofMillis(1L));
    }

    public static TrackedFuture<String, ChannelFuture> createClientConnection(EventLoop eventLoop, SslContext sslContext, URI uri, IReplayContexts.ITargetRequestContext iTargetRequestContext, Duration duration) {
        IReplayContexts.IRequestConnectingContext createHttpConnectingContext = iTargetRequestContext.createHttpConnectingContext();
        if (eventLoop.isShuttingDown()) {
            return TextTrackedFuture.failedFuture(new IllegalStateException("EventLoop is shutting down"), () -> {
                return "createClientConnection is failing due to the pending shutdown of the EventLoop";
            });
        }
        String host = uri.getHost();
        int port = uri.getPort();
        log.atTrace().setMessage(() -> {
            return "Active - setting up backend connection to " + host + ":" + port;
        }).log();
        Bootstrap bootstrap = new Bootstrap();
        final IReplayContexts.IChannelKeyContext channelKeyContext = ((IReplayContexts.IReplayerHttpTransactionContext) iTargetRequestContext.getLogicalEnclosingScope()).getChannelKeyContext();
        bootstrap.group(eventLoop).handler(new ChannelInitializer<Channel>() { // from class: org.opensearch.migrations.replay.datahandlers.NettyPacketToHttpConsumer.1
            protected void initChannel(@NonNull Channel channel) throws Exception {
                if (channel == null) {
                    throw new NullPointerException("ch is marked non-null but is null");
                }
                channel.pipeline().addFirst(NettyPacketToHttpConsumer.CONNECTION_CLOSE_HANDLER_NAME, new ConnectionClosedListenerHandler(IReplayContexts.IChannelKeyContext.this));
            }
        }).channel(NioSocketChannel.class).option(ChannelOption.AUTO_READ, false);
        ChannelFuture connect = bootstrap.connect(host, port);
        return NettyFutureBinders.bindNettyFutureToTrackableFuture((Future<?>) connect, "").getDeferredFutureThroughHandle((r15, th) -> {
            try {
                Throwable unwindPossibleCompletionException = TrackedFuture.unwindPossibleCompletionException(th);
                if (unwindPossibleCompletionException != null) {
                    log.atWarn().setMessage(() -> {
                        return "Caught exception while trying to get an active channel";
                    }).setCause(unwindPossibleCompletionException).log();
                } else if (!connect.channel().isActive()) {
                    unwindPossibleCompletionException = new ChannelNotActiveException();
                }
                if (unwindPossibleCompletionException == null) {
                    TrackedFuture<String, ChannelFuture> initializeConnectionHandlers = initializeConnectionHandlers(sslContext, channelKeyContext, connect);
                    createHttpConnectingContext.close();
                    return initializeConnectionHandlers;
                }
                createHttpConnectingContext.addTraceException(unwindPossibleCompletionException, true);
                if (unwindPossibleCompletionException instanceof Exception) {
                    TrackedFuture<String, U> thenCompose = NettyFutureBinders.bindNettyScheduleToCompletableFuture(eventLoop, duration).thenCompose(r15 -> {
                        return createClientConnection(eventLoop, sslContext, uri, iTargetRequestContext, Duration.ofMillis(Math.min(MAX_WAIT_BETWEEN_CREATE_RETRIES.toMillis(), duration.multipliedBy(2L).toMillis())));
                    }, () -> {
                        return "";
                    });
                    createHttpConnectingContext.close();
                    return thenCompose;
                }
                TextTrackedFuture failedFuture = TextTrackedFuture.failedFuture(unwindPossibleCompletionException, () -> {
                    return "failed to connect";
                });
                createHttpConnectingContext.close();
                return failedFuture;
            } catch (Throwable th) {
                createHttpConnectingContext.close();
                throw th;
            }
        }, () -> {
            return "";
        });
    }

    private static TrackedFuture<String, ChannelFuture> initializeConnectionHandlers(SslContext sslContext, IReplayContexts.IChannelKeyContext iChannelKeyContext, ChannelFuture channelFuture) {
        Channel channel = channelFuture.channel();
        log.atTrace().setMessage(() -> {
            return iChannelKeyContext.getChannelKey() + " Done setting up client channel & it was successful for " + channel;
        }).log();
        ChannelPipeline pipeline = channel.pipeline();
        if (sslContext == null) {
            return TextTrackedFuture.completedFuture(channelFuture, () -> {
                return "";
            });
        }
        SSLEngine newEngine = sslContext.newEngine(channel.alloc());
        newEngine.setUseClientMode(true);
        SslHandler sslHandler = new SslHandler(newEngine);
        addLoggingHandlerLast(pipeline, "A");
        pipeline.addLast(SSL_HANDLER_NAME, sslHandler);
        return NettyFutureBinders.bindNettyFutureToTrackableFuture((Future<?>) sslHandler.handshakeFuture(), (Supplier<String>) () -> {
            return "";
        }).thenApply(r3 -> {
            return channelFuture;
        }, () -> {
            return "";
        });
    }

    private static boolean channelIsInUse(Channel channel) {
        ChannelHandler last = channel.pipeline().last();
        if ((last instanceof ConnectionClosedListenerHandler) || (last instanceof SslHandler)) {
            if ($assertionsDisabled || !channel.config().isAutoRead()) {
                return false;
            }
            throw new AssertionError();
        }
        if ($assertionsDisabled || channel.config().isAutoRead()) {
            return true;
        }
        throw new AssertionError();
    }

    private void initializeRequestHandlers() {
        if (!$assertionsDisabled && !this.channel.isActive()) {
            throw new AssertionError();
        }
        if (channelIsInUse(this.channel)) {
            throw new IllegalStateException("Channel " + this.channel + "is being used elsewhere already!");
        }
        ChannelPipeline pipeline = this.channel.pipeline();
        pipeline.addAfter(CONNECTION_CLOSE_HANDLER_NAME, WRITE_COUNT_WATCHER_HANDLER_NAME, new WriteMeteringHandler(i -> {
            if (i == 0) {
                return;
            }
            if (!(this.currentRequestContextUnion instanceof IReplayContexts.IRequestSendingContext)) {
                getCurrentRequestSpan().close();
                setCurrentMessageContext(getParentContext().createHttpSendingContext());
            }
            getParentContext().onBytesSent(i);
        }));
        pipeline.addAfter(CONNECTION_CLOSE_HANDLER_NAME, READ_COUNT_WATCHER_HANDLER_NAME, new ReadMeteringHandler(i2 -> {
            if (i2 == 0) {
                return;
            }
            if (!(this.currentRequestContextUnion instanceof IReplayContexts.IReceivingHttpResponseContext)) {
                getCurrentRequestSpan().close();
                setCurrentMessageContext(getParentContext().createHttpReceivingContext());
            }
            getParentContext().onBytesReceived(i2);
        }));
        pipeline.addLast(READ_TIMEOUT_HANDLER_NAME, new ReadTimeoutHandler(this.readTimeoutDuration.toMillis(), TimeUnit.MILLISECONDS));
        addLoggingHandlerLast(pipeline, "B");
        pipeline.addLast(new ChannelHandler[]{new BacksideSnifferHandler(this.responseBuilder)});
        addLoggingHandlerLast(pipeline, "C");
        pipeline.addLast(new ChannelHandler[]{new HttpResponseDecoder()});
        addLoggingHandlerLast(pipeline, "D");
        pipeline.addLast(BACKSIDE_HTTP_WATCHER_HANDLER_NAME, new BacksideHttpWatcherHandler(this.responseBuilder));
        addLoggingHandlerLast(pipeline, "E");
        log.atTrace().setMessage(() -> {
            return "Added handlers to the pipeline: " + pipeline;
        }).log();
        this.channel.config().setAutoRead(true);
    }

    private static void addLoggingHandlerLast(ChannelPipeline channelPipeline, String str) {
        PIPELINE_LOGGING_OPTIONAL.ifPresent(logLevel -> {
            channelPipeline.addLast(new ChannelHandler[]{new LoggingHandler("n" + str, logLevel)});
        });
    }

    private void deactivateChannel() {
        try {
            ChannelPipeline pipeline = this.channel.pipeline();
            log.atDebug().setMessage(() -> {
                return "Resetting the pipeline for channel {} currently at: {}";
            }).addArgument(this.channel).addArgument(pipeline).log();
            for (String str : new String[]{WRITE_COUNT_WATCHER_HANDLER_NAME, READ_COUNT_WATCHER_HANDLER_NAME}) {
                try {
                    pipeline.remove(str);
                } catch (NoSuchElementException e) {
                    log.atWarn().setMessage(() -> {
                        return "Ignoring an exception that the " + str + " wasn't present";
                    }).log();
                }
            }
            while (true) {
                ChannelHandler last = pipeline.last();
                if ((last instanceof SslHandler) || (last instanceof ConnectionClosedListenerHandler)) {
                    break;
                } else {
                    pipeline.removeLast();
                }
            }
            this.channel.config().setAutoRead(false);
            log.atDebug().setMessage(() -> {
                return "Reset the pipeline for channel " + this.channel + " back to: " + pipeline;
            }).log();
            getCurrentRequestSpan().close();
            getParentContext().close();
        } catch (Throwable th) {
            getCurrentRequestSpan().close();
            getParentContext().close();
            throw th;
        }
    }

    @Override // org.opensearch.migrations.replay.datahandlers.IPacketConsumer
    public TrackedFuture<String, Void> consumeBytes(ByteBuf byteBuf) {
        this.activeChannelFuture = this.activeChannelFuture.getDeferredFutureThroughHandle((r6, th) -> {
            if (th == null) {
                log.atTrace().setMessage("{}").addArgument(() -> {
                    return "outboundChannelFuture is ready. Writing packets (hash=" + System.identityHashCode(byteBuf) + "): " + httpContext() + ": " + byteBuf.toString(StandardCharsets.UTF_8);
                }).log();
                return writePacketAndUpdateFuture(byteBuf).whenComplete((r5, th) -> {
                    log.atTrace().setMessage(() -> {
                        return "finished writing " + httpContext() + " t=" + th;
                    }).log();
                }, () -> {
                    return "";
                });
            }
            log.atWarn().setMessage(() -> {
                return httpContext().getReplayerRequestKey() + "outbound channel was not set up successfully, NOT writing bytes hash=" + System.identityHashCode(byteBuf);
            }).log();
            if (this.channel != null) {
                this.channel.close();
            }
            return TrackedFuture.Factory.failedFuture(th, () -> {
                return "exception";
            });
        }, () -> {
            return "consumeBytes - after channel is fully initialized (potentially waiting on TLS handshake)";
        });
        log.atTrace().setMessage(() -> {
            return "Setting up write of packetData[" + byteBuf + "] hash=" + System.identityHashCode(byteBuf) + ".  Created future consumeBytes=" + this.activeChannelFuture;
        }).log();
        return this.activeChannelFuture;
    }

    private IReplayContexts.IReplayerHttpTransactionContext httpContext() {
        return (IReplayContexts.IReplayerHttpTransactionContext) getParentContext().getLogicalEnclosingScope();
    }

    private TrackedFuture<String, Void> writePacketAndUpdateFuture(ByteBuf byteBuf) {
        return NettyFutureBinders.bindNettyFutureToTrackableFuture((Future<?>) this.channel.writeAndFlush(byteBuf), "CompletableFuture that will wait for the netty future to fill in the completion value");
    }

    @Override // org.opensearch.migrations.replay.datahandlers.IPacketFinalizingConsumer
    public TrackedFuture<String, AggregatedRawResponse> finalizeRequest() {
        TrackedFuture<String, AggregatedRawResponse> map = this.activeChannelFuture.getDeferredFutureThroughHandle((r6, th) -> {
            log.atDebug().setMessage(() -> {
                return "finalization running since all prior work has completed for " + httpContext();
            }).log();
            if (!(this.currentRequestContextUnion instanceof IReplayContexts.IReceivingHttpResponseContext)) {
                getCurrentRequestSpan().close();
                setCurrentMessageContext(getParentContext().createWaitingForResponseContext());
            }
            CompletableFuture completableFuture = new CompletableFuture();
            TrackedFuture trackedFuture = new TrackedFuture(completableFuture, () -> {
                return "NettyPacketToHttpConsumer.finalizeRequest()";
            });
            if (th == null) {
                BacksideHttpWatcherHandler backsideHttpWatcherHandler = this.channel.pipeline().get(BACKSIDE_HTTP_WATCHER_HANDLER_NAME);
                Objects.requireNonNull(completableFuture);
                backsideHttpWatcherHandler.addCallback((v1) -> {
                    r1.complete(v1);
                });
            } else {
                completableFuture.complete(this.responseBuilder.addErrorCause(th).build());
            }
            return trackedFuture;
        }, () -> {
            return "Waiting for previous consumes to set the future";
        }).map(completableFuture -> {
            return completableFuture.whenComplete((aggregatedRawResponse, th2) -> {
                try {
                    if (this.channel == null) {
                        log.atTrace().setMessage(() -> {
                            return "finalizeRequest().whenComplete has no channel present that needs to be to deactivated.";
                        }).log();
                    } else {
                        deactivateChannel();
                    }
                } finally {
                    getCurrentRequestSpan().close();
                    getParentContext().close();
                }
            });
        }, () -> {
            return "clearing pipeline";
        });
        log.atDebug().setMessage(() -> {
            return "Chaining finalization work off of " + this.activeChannelFuture + " for " + httpContext() + ".  Returning finalization future=" + map;
        }).log();
        return map;
    }

    static {
        $assertionsDisabled = !NettyPacketToHttpConsumer.class.desiredAssertionStatus();
        log = LoggerFactory.getLogger(NettyPacketToHttpConsumer.class);
        PIPELINE_LOGGING_OPTIONAL = Optional.empty();
        MAX_WAIT_BETWEEN_CREATE_RETRIES = Duration.ofSeconds(30L);
    }
}
