package io.servicetalk.http.api;

import io.servicetalk.buffer.api.Buffer;
import io.servicetalk.concurrent.CompletableSource;
import io.servicetalk.concurrent.PublisherSource;
import io.servicetalk.concurrent.SingleSource;
import io.servicetalk.concurrent.api.Completable;
import io.servicetalk.concurrent.api.Processors;
import io.servicetalk.concurrent.api.Publisher;
import io.servicetalk.concurrent.api.Single;
import io.servicetalk.concurrent.api.SourceAdapters;
import io.servicetalk.concurrent.api.internal.ConnectablePayloadWriter;
import io.servicetalk.concurrent.internal.SubscriberUtils;
import io.servicetalk.concurrent.internal.ThreadInterruptingCancellable;
import java.io.IOException;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:io/servicetalk/http/api/BlockingStreamingToStreamingService.class */
public final class BlockingStreamingToStreamingService extends AbstractServiceAdapterHolder {
    private static final HttpExecutionStrategy DEFAULT_STRATEGY = HttpExecutionStrategies.OFFLOAD_RECEIVE_META_STRATEGY;
    private static final Logger LOGGER = LoggerFactory.getLogger((Class<?>) BlockingStreamingToStreamingService.class);
    private final BlockingStreamingHttpService original;

    /* loaded from: input_file:io/servicetalk/http/api/BlockingStreamingToStreamingService$BufferHttpPayloadWriter.class */
    private static final class BufferHttpPayloadWriter implements HttpPayloadWriter<Buffer> {
        private static final AtomicIntegerFieldUpdater<BufferHttpPayloadWriter> subscriberCompleteUpdater = AtomicIntegerFieldUpdater.newUpdater(BufferHttpPayloadWriter.class, "subscriberComplete");
        private volatile int subscriberComplete;
        private final ConnectablePayloadWriter<Buffer> payloadWriter = new ConnectablePayloadWriter<>();
        private final HttpHeaders trailers;
        private final CompletableSource.Subscriber subscriber;

        BufferHttpPayloadWriter(HttpHeaders httpHeaders, CompletableSource.Subscriber subscriber) {
            this.trailers = httpHeaders;
            this.subscriber = subscriber;
        }

        @Override // io.servicetalk.oio.api.PayloadWriter
        public void write(Buffer buffer) throws IOException {
            this.payloadWriter.write(buffer);
        }

        @Override // java.io.Flushable
        public void flush() throws IOException {
            this.payloadWriter.flush();
        }

        @Override // java.io.Closeable, java.lang.AutoCloseable
        public void close() throws IOException {
            try {
                this.payloadWriter.close();
            } finally {
                if (markSubscriberComplete()) {
                    this.subscriber.onComplete();
                }
            }
        }

        @Override // io.servicetalk.http.api.TrailersHolder
        public HttpHeaders trailers() {
            return this.trailers;
        }

        Publisher<Buffer> connect() {
            return this.payloadWriter.connect();
        }

        boolean markSubscriberComplete() {
            return subscriberCompleteUpdater.compareAndSet(this, 0, 1);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public BlockingStreamingToStreamingService(BlockingStreamingHttpService blockingStreamingHttpService, HttpExecutionStrategyInfluencer httpExecutionStrategyInfluencer) {
        super(httpExecutionStrategyInfluencer.influenceStrategy(DEFAULT_STRATEGY));
        this.original = (BlockingStreamingHttpService) Objects.requireNonNull(blockingStreamingHttpService);
    }

    @Override // io.servicetalk.http.api.StreamingHttpService
    public Single<StreamingHttpResponse> handle(final HttpServiceContext httpServiceContext, final StreamingHttpRequest streamingHttpRequest, StreamingHttpResponseFactory streamingHttpResponseFactory) {
        return new Single<StreamingHttpResponse>() { // from class: io.servicetalk.http.api.BlockingStreamingToStreamingService.1
            @Override // io.servicetalk.concurrent.api.Single
            protected void handleSubscribe(SingleSource.Subscriber<? super StreamingHttpResponse> subscriber) {
                ThreadInterruptingCancellable threadInterruptingCancellable = new ThreadInterruptingCancellable(Thread.currentThread());
                try {
                    subscriber.onSubscribe(threadInterruptingCancellable);
                    CompletableSource.Processor newCompletableProcessor = Processors.newCompletableProcessor();
                    DefaultBlockingStreamingHttpServerResponse defaultBlockingStreamingHttpServerResponse = null;
                    BufferHttpPayloadWriter bufferHttpPayloadWriter = null;
                    try {
                        BufferHttpPayloadWriter bufferHttpPayloadWriter2 = new BufferHttpPayloadWriter(httpServiceContext.headersFactory().newTrailers(), newCompletableProcessor);
                        bufferHttpPayloadWriter = bufferHttpPayloadWriter2;
                        StreamingHttpRequest streamingHttpRequest2 = streamingHttpRequest;
                        HttpServiceContext httpServiceContext2 = httpServiceContext;
                        defaultBlockingStreamingHttpServerResponse = new DefaultBlockingStreamingHttpServerResponse(HttpResponseStatus.OK, streamingHttpRequest.version(), httpServiceContext.headersFactory().newHeaders(), bufferHttpPayloadWriter2, httpServiceContext.mo898executionContext().bufferAllocator(), httpResponseMetaData -> {
                            try {
                                HttpHeaders headers = httpResponseMetaData.headers();
                                boolean isTransferEncodingChunked = HeaderUtils.isTransferEncodingChunked(headers);
                                if (!isTransferEncodingChunked && !HttpProtocolVersion.HTTP_1_0.equals(httpResponseMetaData.version()) && !HeaderUtils.hasContentLength(headers) && !HttpRequestMethod.HEAD.equals(streamingHttpRequest2.method())) {
                                    headers.add(HttpHeaderNames.TRANSFER_ENCODING, HttpHeaderValues.CHUNKED);
                                    isTransferEncodingChunked = true;
                                }
                                Publisher merge = SourceAdapters.fromSource(newCompletableProcessor).merge((Publisher) bufferHttpPayloadWriter2.connect().map(buffer -> {
                                    return buffer;
                                }));
                                if (isTransferEncodingChunked) {
                                    merge = merge.concat(succeeded(bufferHttpPayloadWriter2.trailers()));
                                }
                                subscriber.onSuccess(StreamingHttpResponses.newTransportResponse(httpResponseMetaData.status(), httpResponseMetaData.version(), httpResponseMetaData.headers(), httpServiceContext2.mo898executionContext().bufferAllocator(), merge.beforeSubscription(() -> {
                                    return new PublisherSource.Subscription() { // from class: io.servicetalk.http.api.BlockingStreamingToStreamingService.1.1
                                        @Override // io.servicetalk.concurrent.PublisherSource.Subscription
                                        public void request(long j) {
                                        }

                                        @Override // io.servicetalk.concurrent.Cancellable
                                        public void cancel() {
                                            threadInterruptingCancellable.cancel();
                                        }
                                    };
                                }), httpServiceContext2.headersFactory()));
                            } catch (Throwable th) {
                                subscriber.onError(th);
                                throw th;
                            }
                        });
                        BlockingStreamingToStreamingService.this.original.handle(httpServiceContext, streamingHttpRequest.toBlockingStreamingRequest(), defaultBlockingStreamingHttpServerResponse);
                        threadInterruptingCancellable.setDone();
                    } catch (Throwable th) {
                        threadInterruptingCancellable.setDone(th);
                        if (defaultBlockingStreamingHttpServerResponse == null || defaultBlockingStreamingHttpServerResponse.markMetaSent()) {
                            SubscriberUtils.safeOnError(subscriber, th);
                        } else if (bufferHttpPayloadWriter.markSubscriberComplete()) {
                            newCompletableProcessor.onError(th);
                        } else {
                            BlockingStreamingToStreamingService.LOGGER.error("An exception occurred after the response was sent", th);
                        }
                    }
                } catch (Throwable th2) {
                    SubscriberUtils.handleExceptionFromOnSubscribe(subscriber, th2);
                }
            }
        };
    }

    @Override // io.servicetalk.http.api.StreamingHttpService, io.servicetalk.concurrent.api.AsyncCloseable
    public Completable closeAsync() {
        BlockingStreamingHttpService blockingStreamingHttpService = this.original;
        blockingStreamingHttpService.getClass();
        return BlockingUtils.blockingToCompletable(blockingStreamingHttpService::close);
    }

    @Override // io.servicetalk.concurrent.api.AsyncCloseable
    public Completable closeAsyncGracefully() {
        BlockingStreamingHttpService blockingStreamingHttpService = this.original;
        blockingStreamingHttpService.getClass();
        return BlockingUtils.blockingToCompletable(blockingStreamingHttpService::closeGracefully);
    }
}
