package io.servicetalk.http.api;

import io.servicetalk.buffer.api.Buffer;
import io.servicetalk.concurrent.CompletableSource;
import io.servicetalk.concurrent.PublisherSource;
import io.servicetalk.concurrent.api.Completable;
import io.servicetalk.concurrent.api.Publisher;
import io.servicetalk.concurrent.api.SourceAdapters;
import io.servicetalk.concurrent.api.internal.SubscribableCompletable;
import io.servicetalk.concurrent.internal.DelayedCancellable;
import io.servicetalk.utils.internal.PlatformDependent;
import java.io.IOException;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/servicetalk/http/api/StreamingHttpServiceToBlockingStreamingHttpService.class */
final class StreamingHttpServiceToBlockingStreamingHttpService implements BlockingStreamingHttpService {
    private final StreamingHttpService original;

    /* loaded from: input_file:io/servicetalk/http/api/StreamingHttpServiceToBlockingStreamingHttpService$PayloadBodyAndTrailersToPayloadWriter.class */
    private static class PayloadBodyAndTrailersToPayloadWriter extends SubscribableCompletable {
        private final Publisher<Object> payloadBodyAndTrailers;
        private final HttpPayloadWriter<Buffer> payloadWriter;

        /* loaded from: input_file:io/servicetalk/http/api/StreamingHttpServiceToBlockingStreamingHttpService$PayloadBodyAndTrailersToPayloadWriter$PayloadPump.class */
        private static final class PayloadPump extends DelayedCancellable implements PublisherSource.Subscriber<Object> {
            private static final Logger LOGGER;
            private static final AtomicIntegerFieldUpdater<PayloadPump> terminatedUpdater;
            private final CompletableSource.Subscriber subscriber;
            private final HttpPayloadWriter<Buffer> payloadWriter;
            private volatile int terminated;
            static final /* synthetic */ boolean $assertionsDisabled;

            PayloadPump(CompletableSource.Subscriber subscriber, HttpPayloadWriter<Buffer> httpPayloadWriter) {
                this.subscriber = subscriber;
                this.payloadWriter = httpPayloadWriter;
            }

            @Override // io.servicetalk.concurrent.PublisherSource.Subscriber
            public void onSubscribe(PublisherSource.Subscription subscription) {
                this.subscriber.onSubscribe(this);
                subscription.request(Long.MAX_VALUE);
                delayedCancellable(subscription);
            }

            @Override // io.servicetalk.concurrent.PublisherSource.Subscriber
            public void onNext(@Nullable Object obj) {
                if (!$assertionsDisabled && obj == null) {
                    throw new AssertionError();
                }
                try {
                    if (obj instanceof Buffer) {
                        this.payloadWriter.write((Buffer) obj);
                    } else if (obj instanceof HttpHeaders) {
                        this.payloadWriter.setTrailers((HttpHeaders) obj);
                    } else if (!$assertionsDisabled) {
                        throw new AssertionError("Expected only buffer or trailer in payloadBodyAndTrailers()");
                    }
                } catch (IOException e) {
                    try {
                        if (tryTerminate()) {
                            this.subscriber.onError(e);
                        } else {
                            PlatformDependent.throwException(e);
                        }
                    } finally {
                        cancel();
                    }
                }
            }

            @Override // io.servicetalk.concurrent.PublisherSource.Subscriber
            public void onError(Throwable th) {
                if (tryTerminate()) {
                    this.subscriber.onError(th);
                } else {
                    LOGGER.error("Failed to deliver onError() after termination", th);
                }
            }

            @Override // io.servicetalk.concurrent.PublisherSource.Subscriber
            public void onComplete() {
                try {
                    this.payloadWriter.close();
                } catch (IOException e) {
                    if (tryTerminate()) {
                        this.subscriber.onError(e);
                    } else {
                        LOGGER.warn("Failed to deliver IOException from payloadWriter.close() after termination", (Throwable) e);
                    }
                }
                if (tryTerminate()) {
                    this.subscriber.onComplete();
                }
            }

            boolean tryTerminate() {
                return terminatedUpdater.compareAndSet(this, 0, 1);
            }

            static {
                $assertionsDisabled = !StreamingHttpServiceToBlockingStreamingHttpService.class.desiredAssertionStatus();
                LOGGER = LoggerFactory.getLogger((Class<?>) PayloadPump.class);
                terminatedUpdater = AtomicIntegerFieldUpdater.newUpdater(PayloadPump.class, "terminated");
            }
        }

        PayloadBodyAndTrailersToPayloadWriter(Publisher<Object> publisher, HttpPayloadWriter<Buffer> httpPayloadWriter) {
            this.payloadBodyAndTrailers = publisher;
            this.payloadWriter = httpPayloadWriter;
        }

        @Override // io.servicetalk.concurrent.api.Completable
        protected void handleSubscribe(CompletableSource.Subscriber subscriber) {
            SourceAdapters.toSource(this.payloadBodyAndTrailers).subscribe(new PayloadPump(subscriber, this.payloadWriter));
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public StreamingHttpServiceToBlockingStreamingHttpService(StreamingHttpService streamingHttpService) {
        this.original = (StreamingHttpService) Objects.requireNonNull(streamingHttpService);
    }

    @Override // io.servicetalk.http.api.BlockingStreamingHttpService
    public void handle(HttpServiceContext httpServiceContext, BlockingStreamingHttpRequest blockingStreamingHttpRequest, BlockingStreamingHttpServerResponse blockingStreamingHttpServerResponse) throws Exception {
        BlockingUtils.futureGetCancelOnInterrupt(handleBlockingRequest(httpServiceContext, blockingStreamingHttpRequest, blockingStreamingHttpServerResponse).toFuture());
    }

    @Nonnull
    private Completable handleBlockingRequest(HttpServiceContext httpServiceContext, BlockingStreamingHttpRequest blockingStreamingHttpRequest, BlockingStreamingHttpServerResponse blockingStreamingHttpServerResponse) {
        return this.original.handle(httpServiceContext, blockingStreamingHttpRequest.toStreamingRequest(), httpServiceContext.streamingResponseFactory()).flatMapCompletable(streamingHttpResponse -> {
            copyMeta(streamingHttpResponse, blockingStreamingHttpServerResponse);
            return new PayloadBodyAndTrailersToPayloadWriter(streamingHttpResponse.payloadBodyAndTrailers(), blockingStreamingHttpServerResponse.sendMetaData());
        });
    }

    private void copyMeta(StreamingHttpResponse streamingHttpResponse, BlockingStreamingHttpServerResponse blockingStreamingHttpServerResponse) {
        blockingStreamingHttpServerResponse.setHeaders(streamingHttpResponse.headers());
        blockingStreamingHttpServerResponse.status(streamingHttpResponse.status());
        blockingStreamingHttpServerResponse.version(streamingHttpResponse.version());
    }

    @Override // io.servicetalk.http.api.BlockingStreamingHttpService, java.lang.AutoCloseable
    public void close() throws Exception {
        this.original.closeAsync().toFuture().get();
    }

    @Override // io.servicetalk.concurrent.GracefulAutoCloseable
    public void closeGracefully() throws Exception {
        this.original.closeAsyncGracefully().toFuture().get();
    }
}
