package io.servicetalk.http.netty;

import io.servicetalk.client.api.ConsumableEvent;
import io.servicetalk.concurrent.api.Completable;
import io.servicetalk.concurrent.api.Executors;
import io.servicetalk.concurrent.api.Publisher;
import io.servicetalk.concurrent.api.Single;
import io.servicetalk.concurrent.api.TerminalSignalConsumer;
import io.servicetalk.http.api.FilterableStreamingHttpConnection;
import io.servicetalk.http.api.HttpApiConversions;
import io.servicetalk.http.api.HttpConnectionContext;
import io.servicetalk.http.api.HttpContextKeys;
import io.servicetalk.http.api.HttpEventKey;
import io.servicetalk.http.api.HttpExecutionContext;
import io.servicetalk.http.api.HttpExecutionStrategy;
import io.servicetalk.http.api.HttpHeadersFactory;
import io.servicetalk.http.api.HttpMetaData;
import io.servicetalk.http.api.HttpRequestMetaData;
import io.servicetalk.http.api.HttpRequestMethod;
import io.servicetalk.http.api.HttpResponseMetaData;
import io.servicetalk.http.api.StreamingHttpRequest;
import io.servicetalk.http.api.StreamingHttpRequestResponseFactory;
import io.servicetalk.http.api.StreamingHttpResponse;
import io.servicetalk.http.api.StreamingHttpResponseFactory;
import io.servicetalk.http.api.StreamingHttpResponses;
import io.servicetalk.http.netty.ReservableRequestConcurrencyControllers;
import io.servicetalk.transport.api.IoThreadFactory;
import io.servicetalk.transport.netty.internal.FlushStrategies;
import io.servicetalk.transport.netty.internal.FlushStrategy;
import io.servicetalk.transport.netty.internal.NettyConnectionContext;
import java.util.Objects;
import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:io/servicetalk/http/netty/AbstractStreamingHttpConnection.class */
public abstract class AbstractStreamingHttpConnection<CC extends NettyConnectionContext> implements FilterableStreamingHttpConnection {
    private static final Logger LOGGER = LoggerFactory.getLogger((Class<?>) AbstractStreamingHttpConnection.class);
    static final ReservableRequestConcurrencyControllers.IgnoreConsumedEvent<Integer> ZERO_MAX_CONCURRENCY_EVENT = new ReservableRequestConcurrencyControllers.IgnoreConsumedEvent<>(0);
    static final HttpEventKey<ConsumableEvent<Integer>> MAX_CONCURRENCY_NO_OFFLOADING = HttpEventKey.newKey("max-concurrency-no-offloading", generify(ConsumableEvent.class));
    final CC connection;
    private final HttpConnectionContext connectionContext;
    private final Publisher<? extends ConsumableEvent<Integer>> maxConcurrencySetting;
    private final StreamingHttpRequestResponseFactory reqRespFactory;
    private final HttpHeadersFactory headersFactory;
    private final boolean allowDropTrailersReadFromTransport;

    /* JADX INFO: Access modifiers changed from: package-private */
    public AbstractStreamingHttpConnection(CC cc, int i, StreamingHttpRequestResponseFactory streamingHttpRequestResponseFactory, HttpHeadersFactory httpHeadersFactory, boolean z) {
        this.connection = (CC) Objects.requireNonNull(cc);
        this.connectionContext = new DefaultNettyHttpConnectionContext(cc);
        this.reqRespFactory = (StreamingHttpRequestResponseFactory) Objects.requireNonNull(streamingHttpRequestResponseFactory);
        this.maxConcurrencySetting = Publisher.from(new ReservableRequestConcurrencyControllers.IgnoreConsumedEvent(Integer.valueOf(i))).concat(this.connection.onClosing()).concat(Single.succeeded(ZERO_MAX_CONCURRENCY_EVENT));
        this.headersFactory = httpHeadersFactory;
        this.allowDropTrailersReadFromTransport = z;
    }

    @Override // io.servicetalk.http.api.FilterableStreamingHttpConnection
    public final HttpConnectionContext connectionContext() {
        return this.connectionContext;
    }

    @Override // io.servicetalk.http.api.FilterableStreamingHttpConnection
    public final <T> Publisher<? extends T> transportEventStream(HttpEventKey<T> httpEventKey) {
        if (httpEventKey == MAX_CONCURRENCY_NO_OFFLOADING) {
            return (Publisher<? extends T>) this.maxConcurrencySetting;
        }
        if (httpEventKey == HttpEventKey.MAX_CONCURRENCY) {
            return (Publisher<? extends T>) this.maxConcurrencySetting.publishOn(mo1313executionContext().executionStrategy().isEventOffloaded() ? mo1313executionContext().executor() : Executors.immediate(), IoThreadFactory.IoThread::currentThreadIsIoThread);
        }
        return Publisher.failed(new IllegalArgumentException("Unknown key: " + httpEventKey));
    }

    private Single<StreamingHttpResponse> makeRequest(final HttpRequestMetaData httpRequestMetaData, Publisher<Object> publisher, @Nullable FlushStrategy flushStrategy) {
        return writeAndRead(publisher, flushStrategy).beforeFinally(new TerminalSignalConsumer() { // from class: io.servicetalk.http.netty.AbstractStreamingHttpConnection.1
            @Override // io.servicetalk.concurrent.api.TerminalSignalConsumer
            public void onComplete() {
            }

            @Override // io.servicetalk.concurrent.api.TerminalSignalConsumer
            public void onError(Throwable th) {
            }

            @Override // io.servicetalk.concurrent.api.TerminalSignalConsumer
            public void cancel() {
                if (AbstractStreamingHttpConnection.this.connectionContext().protocol().major() < 2) {
                    AbstractStreamingHttpConnection.LOGGER.debug("{} {} request was cancelled before receiving the full response, closing this {} connection to stop receiving more data", AbstractStreamingHttpConnection.this.connectionContext, httpRequestMetaData, AbstractStreamingHttpConnection.this.connectionContext.protocol());
                    AbstractStreamingHttpConnection.this.closeAsync().subscribe();
                }
            }
        }).liftSyncToSingle(new SpliceFlatStreamToMetaSingle(this::newSplicedResponse));
    }

    @Override // io.servicetalk.http.api.StreamingHttpRequester
    public Single<StreamingHttpResponse> request(StreamingHttpRequest streamingHttpRequest) {
        return Single.defer(() -> {
            Publisher<Object> concatDeferSubscribe;
            if (HeaderUtils.canAddRequestContentLength(streamingHttpRequest)) {
                concatDeferSubscribe = HeaderUtils.setRequestContentLength(connectionContext().protocol(), streamingHttpRequest);
            } else {
                Publisher<Object> messageBody = streamingHttpRequest.messageBody();
                if (HeaderUtils.emptyMessageBody(streamingHttpRequest, messageBody)) {
                    concatDeferSubscribe = HeaderUtils.flatEmptyMessage(connectionContext().protocol(), streamingHttpRequest, messageBody, false);
                } else {
                    concatDeferSubscribe = Single.succeeded(streamingHttpRequest).concatDeferSubscribe(messageBody);
                    if (HeaderUtils.shouldAppendTrailers(connectionContext().protocol(), streamingHttpRequest)) {
                        concatDeferSubscribe = concatDeferSubscribe.scanWithMapper(HeaderUtils::appendTrailersMapper);
                    }
                }
                HeaderUtils.addRequestTransferEncodingIfNecessary(streamingHttpRequest);
            }
            HttpExecutionStrategy requestExecutionStrategy = requestExecutionStrategy(streamingHttpRequest, mo1313executionContext().executionStrategy());
            if (requestExecutionStrategy.isSendOffloaded()) {
                concatDeferSubscribe = concatDeferSubscribe.subscribeOn(this.connectionContext.mo1313executionContext().executor(), IoThreadFactory.IoThread::currentThreadIsIoThread);
            }
            Single<StreamingHttpResponse> makeRequest = makeRequest(streamingHttpRequest, concatDeferSubscribe, determineFlushStrategyForApi(streamingHttpRequest));
            if (requestExecutionStrategy.isMetadataReceiveOffloaded()) {
                makeRequest = makeRequest.publishOn(this.connectionContext.mo1313executionContext().executor(), IoThreadFactory.IoThread::currentThreadIsIoThread);
            }
            if (requestExecutionStrategy.isDataReceiveOffloaded()) {
                makeRequest = makeRequest.map(streamingHttpResponse -> {
                    return streamingHttpResponse.transformMessageBody(publisher -> {
                        return publisher.publishOn(this.connectionContext.mo1313executionContext().executor(), IoThreadFactory.IoThread::currentThreadIsIoThread);
                    });
                });
            }
            return makeRequest.shareContextOnSubscribe();
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static HttpExecutionStrategy requestExecutionStrategy(HttpRequestMetaData httpRequestMetaData, HttpExecutionStrategy httpExecutionStrategy) {
        HttpExecutionStrategy httpExecutionStrategy2 = (HttpExecutionStrategy) httpRequestMetaData.context().get(HttpContextKeys.HTTP_EXECUTION_STRATEGY_KEY);
        return httpExecutionStrategy2 != null ? httpExecutionStrategy2 : httpExecutionStrategy;
    }

    @Nullable
    private static FlushStrategy determineFlushStrategyForApi(HttpRequestMetaData httpRequestMetaData) {
        if (!isSafeToAggregateOrEmpty(httpRequestMetaData) || HeaderUtils.REQ_EXPECT_CONTINUE.test(httpRequestMetaData)) {
            return null;
        }
        return FlushStrategies.flushOnEnd();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static boolean isSafeToAggregateOrEmpty(HttpMetaData httpMetaData) {
        return HttpApiConversions.isPayloadEmpty(httpMetaData) || HttpApiConversions.isSafeToAggregate(httpMetaData);
    }

    @Override // io.servicetalk.http.api.StreamingHttpRequester
    /* renamed from: executionContext */
    public final HttpExecutionContext mo1313executionContext() {
        return this.connectionContext.mo1313executionContext();
    }

    protected abstract Publisher<Object> writeAndRead(Publisher<Object> publisher, @Nullable FlushStrategy flushStrategy);

    private StreamingHttpResponse newSplicedResponse(HttpResponseMetaData httpResponseMetaData, Publisher<Object> publisher) {
        return StreamingHttpResponses.newTransportResponse(httpResponseMetaData.status(), httpResponseMetaData.version(), httpResponseMetaData.headers(), this.connectionContext.mo1313executionContext().bufferAllocator(), publisher, this.allowDropTrailersReadFromTransport, this.headersFactory);
    }

    @Override // io.servicetalk.http.api.StreamingHttpRequestFactory
    public final StreamingHttpRequest newRequest(HttpRequestMethod httpRequestMethod, String str) {
        return this.reqRespFactory.newRequest(httpRequestMethod, str);
    }

    @Override // io.servicetalk.http.api.StreamingHttpRequester
    public final StreamingHttpResponseFactory httpResponseFactory() {
        return this.reqRespFactory;
    }

    @Override // io.servicetalk.concurrent.api.ListenableAsyncCloseable
    public final Completable onClose() {
        return this.connectionContext.onClose();
    }

    @Override // io.servicetalk.concurrent.api.ListenableAsyncCloseable
    public final Completable onClosing() {
        return this.connectionContext.onClosing();
    }

    @Override // io.servicetalk.concurrent.api.AsyncCloseable
    public final Completable closeAsync() {
        return this.connectionContext.closeAsync();
    }

    @Override // io.servicetalk.concurrent.api.AsyncCloseable
    public final Completable closeAsyncGracefully() {
        return this.connectionContext.closeAsyncGracefully();
    }

    public final String toString() {
        return getClass().getName() + '(' + this.connectionContext + ')';
    }

    /* JADX WARN: Multi-variable type inference failed */
    private static <T> Class<T> generify(Class<?> cls) {
        return cls;
    }
}
