package io.servicetalk.http.api;

import io.servicetalk.buffer.api.Buffer;
import io.servicetalk.concurrent.CompletableSource;
import io.servicetalk.concurrent.PublisherSource;
import io.servicetalk.concurrent.SingleSource;
import io.servicetalk.concurrent.api.Completable;
import io.servicetalk.concurrent.api.Processors;
import io.servicetalk.concurrent.api.Publisher;
import io.servicetalk.concurrent.api.Single;
import io.servicetalk.concurrent.api.SourceAdapters;
import io.servicetalk.concurrent.api.internal.ConnectablePayloadWriter;
import io.servicetalk.concurrent.internal.SubscriberUtils;
import io.servicetalk.concurrent.internal.ThreadInterruptingCancellable;
import io.servicetalk.oio.api.internal.PayloadWriterUtils;
import java.io.IOException;
import java.util.Objects;

/* loaded from: input_file:io/servicetalk/http/api/BlockingStreamingToStreamingService.class */
final class BlockingStreamingToStreamingService extends AbstractServiceAdapterHolder {
    private static final HttpExecutionStrategy DEFAULT_STRATEGY = DefaultHttpExecutionStrategy.OFFLOAD_RECEIVE_META_STRATEGY;
    private final BlockingStreamingHttpService original;

    /* loaded from: input_file:io/servicetalk/http/api/BlockingStreamingToStreamingService$BufferHttpPayloadWriter.class */
    private static final class BufferHttpPayloadWriter implements HttpPayloadWriter<Buffer> {
        private final ConnectablePayloadWriter<Buffer> payloadWriter = new ConnectablePayloadWriter<>();
        private final HttpHeaders trailers;

        BufferHttpPayloadWriter(HttpHeaders httpHeaders) {
            this.trailers = httpHeaders;
        }

        @Override // io.servicetalk.oio.api.PayloadWriter
        public void write(Buffer buffer) throws IOException {
            this.payloadWriter.write(buffer);
        }

        @Override // java.io.Flushable
        public void flush() throws IOException {
            this.payloadWriter.flush();
        }

        @Override // java.io.Closeable, java.lang.AutoCloseable
        public void close() throws IOException {
            this.payloadWriter.close();
        }

        @Override // io.servicetalk.oio.api.PayloadWriter
        public void close(Throwable th) throws IOException {
            this.payloadWriter.close(th);
        }

        @Override // io.servicetalk.http.api.TrailersHolder
        public HttpHeaders trailers() {
            return this.trailers;
        }

        Publisher<Buffer> connect() {
            return this.payloadWriter.connect();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public BlockingStreamingToStreamingService(BlockingStreamingHttpService blockingStreamingHttpService, HttpExecutionStrategy httpExecutionStrategy) {
        super(HttpExecutionStrategies.defaultStrategy() == httpExecutionStrategy ? DEFAULT_STRATEGY : httpExecutionStrategy);
        this.original = (BlockingStreamingHttpService) Objects.requireNonNull(blockingStreamingHttpService);
    }

    @Override // io.servicetalk.http.api.StreamingHttpService
    public Single<StreamingHttpResponse> handle(final HttpServiceContext httpServiceContext, final StreamingHttpRequest streamingHttpRequest, StreamingHttpResponseFactory streamingHttpResponseFactory) {
        return new Single<StreamingHttpResponse>() { // from class: io.servicetalk.http.api.BlockingStreamingToStreamingService.1
            @Override // io.servicetalk.concurrent.api.Single
            protected void handleSubscribe(SingleSource.Subscriber<? super StreamingHttpResponse> subscriber) {
                ThreadInterruptingCancellable threadInterruptingCancellable = new ThreadInterruptingCancellable(Thread.currentThread());
                try {
                    subscriber.onSubscribe(threadInterruptingCancellable);
                    CompletableSource.Processor newCompletableProcessor = Processors.newCompletableProcessor();
                    BufferHttpPayloadWriter bufferHttpPayloadWriter = new BufferHttpPayloadWriter(httpServiceContext.headersFactory().newTrailers());
                    DefaultBlockingStreamingHttpServerResponse defaultBlockingStreamingHttpServerResponse = null;
                    try {
                        StreamingHttpRequest streamingHttpRequest2 = streamingHttpRequest;
                        HttpServiceContext httpServiceContext2 = httpServiceContext;
                        defaultBlockingStreamingHttpServerResponse = new DefaultBlockingStreamingHttpServerResponse(HttpResponseStatus.OK, streamingHttpRequest.version(), httpServiceContext.headersFactory().newHeaders(), bufferHttpPayloadWriter, httpServiceContext.mo1296executionContext().bufferAllocator(), defaultHttpResponseMetaData -> {
                            try {
                                HttpHeaders headers = defaultHttpResponseMetaData.headers();
                                HttpProtocolVersion version = defaultHttpResponseMetaData.version();
                                boolean z = version.major() > 1 || HeaderUtils.isTransferEncodingChunked(headers);
                                if (!z && HttpProtocolVersion.h1TrailersSupported(version) && !HeaderUtils.hasContentLength(headers) && !HttpRequestMethod.HEAD.equals(streamingHttpRequest2.method())) {
                                    headers.add(HttpHeaderNames.TRANSFER_ENCODING, HttpHeaderValues.CHUNKED);
                                    z = true;
                                }
                                Publisher merge = SourceAdapters.fromSource(newCompletableProcessor).merge(bufferHttpPayloadWriter.connect());
                                if (z) {
                                    merge = merge.concat(succeeded(bufferHttpPayloadWriter.trailers()));
                                }
                                subscriber.onSuccess(new DefaultStreamingHttpResponse(defaultHttpResponseMetaData.status(), version, headers, defaultHttpResponseMetaData.context0(), httpServiceContext2.mo1296executionContext().bufferAllocator(), merge.beforeSubscription(() -> {
                                    return new PublisherSource.Subscription() { // from class: io.servicetalk.http.api.BlockingStreamingToStreamingService.1.1
                                        @Override // io.servicetalk.concurrent.PublisherSource.Subscription
                                        public void request(long j) {
                                        }

                                        @Override // io.servicetalk.concurrent.Cancellable
                                        public void cancel() {
                                            threadInterruptingCancellable.cancel();
                                        }
                                    };
                                }), DefaultPayloadInfo.forTransportReceive(false, version, headers), httpServiceContext2.headersFactory()));
                            } catch (Throwable th) {
                                subscriber.onError(th);
                                throw th;
                            }
                        });
                        BlockingStreamingToStreamingService.this.original.handle(httpServiceContext, streamingHttpRequest.toBlockingStreamingRequest(), defaultBlockingStreamingHttpServerResponse);
                        newCompletableProcessor.onComplete();
                        threadInterruptingCancellable.setDone();
                    } catch (Throwable th) {
                        threadInterruptingCancellable.setDone(th);
                        if (defaultBlockingStreamingHttpServerResponse == null || defaultBlockingStreamingHttpServerResponse.markMetaSent()) {
                            SubscriberUtils.safeOnError(subscriber, th);
                            return;
                        }
                        try {
                            newCompletableProcessor.onError(th);
                            PayloadWriterUtils.safeClose(bufferHttpPayloadWriter, th);
                        } catch (Throwable th2) {
                            PayloadWriterUtils.safeClose(bufferHttpPayloadWriter, th);
                            throw th2;
                        }
                    }
                } catch (Throwable th3) {
                    SubscriberUtils.handleExceptionFromOnSubscribe(subscriber, th3);
                }
            }
        };
    }

    @Override // io.servicetalk.http.api.StreamingHttpService, io.servicetalk.concurrent.api.AsyncCloseable
    public Completable closeAsync() {
        return Completable.fromCallable(() -> {
            this.original.close();
            return null;
        });
    }

    @Override // io.servicetalk.concurrent.api.AsyncCloseable
    public Completable closeAsyncGracefully() {
        return Completable.fromCallable(() -> {
            this.original.closeGracefully();
            return null;
        });
    }
}
