package io.servicetalk.http.netty;

import io.servicetalk.buffer.api.Buffer;
import io.servicetalk.concurrent.PublisherSource;
import io.servicetalk.concurrent.api.AsyncContext;
import io.servicetalk.concurrent.api.Single;
import io.servicetalk.concurrent.api.TerminalSignalConsumer;
import io.servicetalk.context.api.ContextMap;
import io.servicetalk.http.api.HttpExecutionStrategies;
import io.servicetalk.http.api.HttpExecutionStrategy;
import io.servicetalk.http.api.HttpExecutionStrategyInfluencer;
import io.servicetalk.http.api.HttpHeaders;
import io.servicetalk.http.api.HttpLifecycleObserver;
import io.servicetalk.http.api.HttpResponseMetaData;
import io.servicetalk.http.api.StreamingHttpRequest;
import io.servicetalk.http.api.StreamingHttpResponse;
import io.servicetalk.http.netty.NoopHttpLifecycleObserver;
import io.servicetalk.http.utils.BeforeFinallyHttpOperator;
import io.servicetalk.transport.api.ConnectionInfo;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:io/servicetalk/http/netty/AbstractLifecycleObserverHttpFilter.class */
public abstract class AbstractLifecycleObserverHttpFilter implements HttpExecutionStrategyInfluencer {
    private static final Logger LOGGER = LoggerFactory.getLogger((Class<?>) AbstractLifecycleObserverHttpFilter.class);
    static final ContextMap.Key<Consumer<ConnectionInfo>> ON_CONNECTION_SELECTED_CONSUMER = ContextMap.Key.newKey("ON_CONNECTION_SELECTED_CONSUMER", Consumer.class);
    private final HttpLifecycleObserver observer;
    private final boolean client;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/servicetalk/http/netty/AbstractLifecycleObserverHttpFilter$ExchangeContext.class */
    public static final class ExchangeContext implements TerminalSignalConsumer {
        private static final AtomicIntegerFieldUpdater<ExchangeContext> remainingUpdater;
        private final HttpLifecycleObserver.HttpExchangeObserver onExchange;
        private final boolean clearAsyncContext;

        @Nullable
        private HttpLifecycleObserver.HttpResponseObserver onResponse;
        private volatile int remaining;
        static final /* synthetic */ boolean $assertionsDisabled;

        private ExchangeContext(HttpLifecycleObserver.HttpExchangeObserver httpExchangeObserver, boolean z, boolean z2) {
            this.onExchange = httpExchangeObserver;
            this.remaining = z ? 1 : 2;
            this.clearAsyncContext = z2;
        }

        void onResponse(HttpResponseMetaData httpResponseMetaData) {
            HttpLifecycleObserver.HttpExchangeObserver httpExchangeObserver = this.onExchange;
            httpExchangeObserver.getClass();
            this.onResponse = (HttpLifecycleObserver.HttpResponseObserver) AbstractLifecycleObserverHttpFilter.safeReport(httpExchangeObserver::onResponse, httpResponseMetaData, this.onExchange, "onResponse", NoopHttpLifecycleObserver.NoopHttpResponseObserver.INSTANCE);
        }

        void onResponseBody(Object obj) {
            if (!$assertionsDisabled && this.onResponse == null) {
                throw new AssertionError();
            }
            if (obj instanceof Buffer) {
                HttpLifecycleObserver.HttpResponseObserver httpResponseObserver = this.onResponse;
                httpResponseObserver.getClass();
                AbstractLifecycleObserverHttpFilter.safeReport((Consumer<Buffer>) httpResponseObserver::onResponseData, (Buffer) obj, this.onResponse, "onResponseData");
            } else {
                if (!(obj instanceof HttpHeaders)) {
                    AbstractLifecycleObserverHttpFilter.LOGGER.warn("Programming mistake: unexpected message body item is received on the response: {}", obj.getClass().getName());
                    return;
                }
                HttpLifecycleObserver.HttpResponseObserver httpResponseObserver2 = this.onResponse;
                httpResponseObserver2.getClass();
                AbstractLifecycleObserverHttpFilter.safeReport((Consumer<HttpHeaders>) httpResponseObserver2::onResponseTrailers, (HttpHeaders) obj, this.onResponse, "onResponseTrailers");
            }
        }

        @Override // io.servicetalk.concurrent.api.TerminalSignalConsumer
        public void onComplete() {
            if (this.onResponse != null) {
                HttpLifecycleObserver.HttpResponseObserver httpResponseObserver = this.onResponse;
                httpResponseObserver.getClass();
                AbstractLifecycleObserverHttpFilter.safeReport(httpResponseObserver::onResponseComplete, this.onResponse, "onResponseComplete");
            }
            decrementRemaining();
        }

        @Override // io.servicetalk.concurrent.api.TerminalSignalConsumer
        public void onError(Throwable th) {
            if (this.onResponse == null) {
                HttpLifecycleObserver.HttpExchangeObserver httpExchangeObserver = this.onExchange;
                httpExchangeObserver.getClass();
                AbstractLifecycleObserverHttpFilter.safeReport((Consumer<Throwable>) httpExchangeObserver::onResponseError, th, (Object) this.onExchange, "onResponseError");
            } else {
                HttpLifecycleObserver.HttpResponseObserver httpResponseObserver = this.onResponse;
                httpResponseObserver.getClass();
                AbstractLifecycleObserverHttpFilter.safeReport((Consumer<Throwable>) httpResponseObserver::onResponseError, th, (Object) this.onResponse, "onResponseError");
            }
            decrementRemaining();
        }

        @Override // io.servicetalk.concurrent.api.TerminalSignalConsumer
        public void cancel() {
            if (this.onResponse == null) {
                HttpLifecycleObserver.HttpExchangeObserver httpExchangeObserver = this.onExchange;
                httpExchangeObserver.getClass();
                AbstractLifecycleObserverHttpFilter.safeReport(httpExchangeObserver::onResponseCancel, this.onExchange, "onResponseCancel");
            } else {
                HttpLifecycleObserver.HttpResponseObserver httpResponseObserver = this.onResponse;
                httpResponseObserver.getClass();
                AbstractLifecycleObserverHttpFilter.safeReport(httpResponseObserver::onResponseCancel, this.onResponse, "onResponseCancel");
            }
            decrementRemaining();
        }

        void requestMessageBodyStarts() {
            remainingUpdater.incrementAndGet(this);
        }

        void decrementRemaining() {
            if (remainingUpdater.decrementAndGet(this) == 0) {
                HttpLifecycleObserver.HttpExchangeObserver httpExchangeObserver = this.onExchange;
                httpExchangeObserver.getClass();
                AbstractLifecycleObserverHttpFilter.safeReport(httpExchangeObserver::onExchangeFinally, this.onExchange, "onExchangeFinally");
                if (this.clearAsyncContext) {
                    AsyncContext.remove(AbstractLifecycleObserverHttpFilter.ON_CONNECTION_SELECTED_CONSUMER);
                }
            }
        }

        static {
            $assertionsDisabled = !AbstractLifecycleObserverHttpFilter.class.desiredAssertionStatus();
            remainingUpdater = AtomicIntegerFieldUpdater.newUpdater(ExchangeContext.class, "remaining");
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/servicetalk/http/netty/AbstractLifecycleObserverHttpFilter$NoopSubscriber.class */
    public static final class NoopSubscriber implements PublisherSource.Subscriber<Object> {
        static final NoopSubscriber INSTANCE = new NoopSubscriber();

        private NoopSubscriber() {
        }

        @Override // io.servicetalk.concurrent.PublisherSource.Subscriber
        public void onSubscribe(PublisherSource.Subscription subscription) {
        }

        @Override // io.servicetalk.concurrent.PublisherSource.Subscriber
        public void onNext(@Nullable Object obj) {
        }

        @Override // io.servicetalk.concurrent.PublisherSource.Subscriber
        public void onError(Throwable th) {
        }

        @Override // io.servicetalk.concurrent.PublisherSource.Subscriber
        public void onComplete() {
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public AbstractLifecycleObserverHttpFilter(HttpLifecycleObserver httpLifecycleObserver, boolean z) {
        this.observer = (HttpLifecycleObserver) Objects.requireNonNull(httpLifecycleObserver);
        this.client = z;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public final Single<StreamingHttpResponse> trackLifecycle(@Nullable ConnectionInfo connectionInfo, StreamingHttpRequest streamingHttpRequest, Function<StreamingHttpRequest, Single<StreamingHttpResponse>> function) {
        return Single.defer(() -> {
            boolean z;
            HttpLifecycleObserver httpLifecycleObserver = this.observer;
            httpLifecycleObserver.getClass();
            HttpLifecycleObserver.HttpExchangeObserver httpExchangeObserver = (HttpLifecycleObserver.HttpExchangeObserver) safeReport((Supplier<HttpLifecycleObserver.HttpExchangeObserver>) httpLifecycleObserver::onNewExchange, this.observer, "onNewExchange", NoopHttpLifecycleObserver.NoopHttpExchangeObserver.INSTANCE);
            if (connectionInfo != null) {
                httpExchangeObserver.getClass();
                safeReport((Consumer<ConnectionInfo>) httpExchangeObserver::onConnectionSelected, connectionInfo, httpExchangeObserver, "onConnectionSelected");
                z = false;
            } else {
                AsyncContext.put(ON_CONNECTION_SELECTED_CONSUMER, connectionInfo2 -> {
                    httpExchangeObserver.getClass();
                    safeReport((Consumer<ConnectionInfo>) httpExchangeObserver::onConnectionSelected, connectionInfo2, httpExchangeObserver, "onConnectionSelected");
                });
                z = true;
            }
            ExchangeContext exchangeContext = new ExchangeContext(httpExchangeObserver, this.client, z);
            httpExchangeObserver.getClass();
            HttpLifecycleObserver.HttpRequestObserver httpRequestObserver = (HttpLifecycleObserver.HttpRequestObserver) safeReport((v1) -> {
                return r0.onRequest(v1);
            }, streamingHttpRequest, httpExchangeObserver, "onRequest", NoopHttpLifecycleObserver.NoopHttpRequestObserver.INSTANCE);
            try {
                return ((Single) function.apply(streamingHttpRequest.transformMessageBody(publisher -> {
                    if (this.client) {
                        publisher = publisher.beforeSubscriber(() -> {
                            exchangeContext.requestMessageBodyStarts();
                            return NoopSubscriber.INSTANCE;
                        });
                    }
                    return publisher.beforeOnNext(obj -> {
                        if (obj instanceof Buffer) {
                            httpRequestObserver.getClass();
                            safeReport((Consumer<Buffer>) httpRequestObserver::onRequestData, (Buffer) obj, httpRequestObserver, "onRequestData");
                        } else if (!(obj instanceof HttpHeaders)) {
                            LOGGER.warn("Programming mistake: unexpected message body item is received on the request: {}", obj.getClass().getName());
                        } else {
                            httpRequestObserver.getClass();
                            safeReport((Consumer<HttpHeaders>) httpRequestObserver::onRequestTrailers, (HttpHeaders) obj, httpRequestObserver, "onRequestTrailers");
                        }
                    }).beforeFinally(new TerminalSignalConsumer() { // from class: io.servicetalk.http.netty.AbstractLifecycleObserverHttpFilter.1
                        @Override // io.servicetalk.concurrent.api.TerminalSignalConsumer
                        public void onComplete() {
                            HttpLifecycleObserver.HttpRequestObserver httpRequestObserver2 = httpRequestObserver;
                            httpRequestObserver2.getClass();
                            AbstractLifecycleObserverHttpFilter.safeReport(httpRequestObserver2::onRequestComplete, httpRequestObserver, "onRequestComplete");
                            exchangeContext.decrementRemaining();
                        }

                        @Override // io.servicetalk.concurrent.api.TerminalSignalConsumer
                        public void onError(Throwable th) {
                            HttpLifecycleObserver.HttpRequestObserver httpRequestObserver2 = httpRequestObserver;
                            httpRequestObserver2.getClass();
                            AbstractLifecycleObserverHttpFilter.safeReport((Consumer<Throwable>) httpRequestObserver2::onRequestError, th, (Object) httpRequestObserver, "onRequestError");
                            exchangeContext.decrementRemaining();
                        }

                        @Override // io.servicetalk.concurrent.api.TerminalSignalConsumer
                        public void cancel() {
                            HttpLifecycleObserver.HttpRequestObserver httpRequestObserver2 = httpRequestObserver;
                            httpRequestObserver2.getClass();
                            AbstractLifecycleObserverHttpFilter.safeReport(httpRequestObserver2::onRequestCancel, httpRequestObserver, "onRequestCancel");
                            exchangeContext.decrementRemaining();
                        }
                    });
                }))).liftSync(new BeforeFinallyHttpOperator(exchangeContext, true)).map(streamingHttpResponse -> {
                    exchangeContext.onResponse(streamingHttpResponse);
                    return streamingHttpResponse.transformMessageBody(publisher2 -> {
                        exchangeContext.getClass();
                        return publisher2.beforeOnNext(exchangeContext::onResponseBody);
                    });
                }).shareContextOnSubscribe();
            } catch (Throwable th) {
                httpExchangeObserver.onResponseError(th);
                return Single.failed(th).shareContextOnSubscribe();
            }
        });
    }

    /* JADX WARN: Can't rename method to resolve collision */
    @Override // io.servicetalk.http.api.HttpExecutionStrategyInfluencer, io.servicetalk.transport.api.ExecutionStrategyInfluencer
    /* renamed from: requiredOffloads */
    public final HttpExecutionStrategy requiredOffloads2() {
        return HttpExecutionStrategies.offloadNone();
    }

    @Override // io.servicetalk.http.api.HttpExecutionStrategyInfluencer
    public final HttpExecutionStrategy influenceStrategy(HttpExecutionStrategy httpExecutionStrategy) {
        return httpExecutionStrategy;
    }

    private static <T> T safeReport(Supplier<T> supplier, Object obj, String str, T t) {
        try {
            return (T) Objects.requireNonNull(supplier.get());
        } catch (Throwable th) {
            LOGGER.warn("Unexpected exception from {} while reporting a '{}' event", obj, str, th);
            return t;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static <T, A> T safeReport(Function<A, T> function, A a, Object obj, String str, T t) {
        try {
            return (T) Objects.requireNonNull(function.apply(a));
        } catch (Throwable th) {
            LOGGER.warn("Unexpected exception from {} while reporting a '{}' event", obj, str, th);
            return t;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static <T> void safeReport(Consumer<T> consumer, T t, Object obj, String str) {
        try {
            consumer.accept(t);
        } catch (Throwable th) {
            LOGGER.warn("Unexpected exception from {} while reporting a '{}' event", obj, str, th);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static void safeReport(Runnable runnable, Object obj, String str) {
        try {
            runnable.run();
        } catch (Throwable th) {
            LOGGER.warn("Unexpected exception from {} while reporting a '{}' event", obj, str, th);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static void safeReport(Consumer<Throwable> consumer, Throwable th, Object obj, String str) {
        try {
            consumer.accept(th);
        } catch (Throwable th2) {
            th2.addSuppressed(th);
            LOGGER.warn("Unexpected exception from {} while reporting a '{}' event", obj, str, th2);
        }
    }
}
