package io.servicetalk.http.api;

import io.servicetalk.concurrent.api.Completable;
import io.servicetalk.concurrent.api.Executor;
import io.servicetalk.concurrent.api.Single;
import java.util.function.BooleanSupplier;
import java.util.function.Function;
import javax.annotation.Nullable;

/* loaded from: input_file:io/servicetalk/http/api/StreamingHttpServiceToOffloadedStreamingHttpService.class */
public class StreamingHttpServiceToOffloadedStreamingHttpService implements StreamingHttpService {
    private final StreamingHttpService delegate;

    @Nullable
    private final Executor executor;
    private final BooleanSupplier shouldOffload;
    private final HttpExecutionStrategy strategy;

    StreamingHttpServiceToOffloadedStreamingHttpService(HttpExecutionStrategy httpExecutionStrategy, @Nullable Executor executor, BooleanSupplier booleanSupplier, StreamingHttpService streamingHttpService) {
        this.strategy = httpExecutionStrategy;
        this.executor = executor;
        this.shouldOffload = booleanSupplier;
        this.delegate = streamingHttpService;
    }

    @Override // io.servicetalk.http.api.StreamingHttpService
    public Single<StreamingHttpResponse> handle(HttpServiceContext httpServiceContext, StreamingHttpRequest streamingHttpRequest, StreamingHttpResponseFactory streamingHttpResponseFactory) {
        Single<StreamingHttpResponse> handle;
        HttpExecutionStrategy missing = httpServiceContext.mo1291executionContext().executionStrategy().missing(this.strategy);
        Executor executor = null != this.executor ? this.executor : httpServiceContext.mo1291executionContext().executor();
        ExecutionContextOverridingServiceContext executionContextOverridingServiceContext = new ExecutionContextOverridingServiceContext(httpServiceContext, this.strategy, executor);
        if (!missing.isRequestResponseOffloaded()) {
            return this.delegate.handle(executionContextOverridingServiceContext, streamingHttpRequest, streamingHttpResponseFactory);
        }
        if (missing.isDataReceiveOffloaded()) {
            streamingHttpRequest = streamingHttpRequest.transformMessageBody(publisher -> {
                return publisher.publishOn(executor, this.shouldOffload);
            });
        }
        if (missing.isMetadataReceiveOffloaded() && this.shouldOffload.getAsBoolean()) {
            StreamingHttpRequest streamingHttpRequest2 = streamingHttpRequest;
            handle = executor.submit(() -> {
                return this.delegate.handle(executionContextOverridingServiceContext, streamingHttpRequest2, streamingHttpResponseFactory).shareContextOnSubscribe();
            }).flatMap(Function.identity());
        } else {
            handle = this.delegate.handle(executionContextOverridingServiceContext, streamingHttpRequest, streamingHttpResponseFactory);
        }
        return missing.isSendOffloaded() ? handle.map(streamingHttpResponse -> {
            return streamingHttpResponse.transformMessageBody(publisher2 -> {
                return publisher2.subscribeOn(executor, this.shouldOffload);
            });
        }).subscribeOn(executor, this.shouldOffload) : handle;
    }

    @Override // io.servicetalk.http.api.StreamingHttpService, io.servicetalk.concurrent.api.AsyncCloseable
    public Completable closeAsync() {
        return this.delegate.closeAsync();
    }

    @Override // io.servicetalk.concurrent.api.AsyncCloseable
    public Completable closeAsyncGracefully() {
        return this.delegate.closeAsyncGracefully();
    }

    public static StreamingHttpService offloadService(final HttpExecutionStrategy httpExecutionStrategy, @Nullable final Executor executor, BooleanSupplier booleanSupplier, final StreamingHttpService streamingHttpService) {
        return httpExecutionStrategy.isRequestResponseOffloaded() ? new StreamingHttpServiceToOffloadedStreamingHttpService(httpExecutionStrategy, executor, booleanSupplier, streamingHttpService) : new StreamingHttpService() { // from class: io.servicetalk.http.api.StreamingHttpServiceToOffloadedStreamingHttpService.1
            @Override // io.servicetalk.http.api.StreamingHttpService
            public Single<StreamingHttpResponse> handle(HttpServiceContext httpServiceContext, StreamingHttpRequest streamingHttpRequest, StreamingHttpResponseFactory streamingHttpResponseFactory) {
                return streamingHttpService.handle(new ExecutionContextOverridingServiceContext(httpServiceContext, httpExecutionStrategy, null != Executor.this ? Executor.this : httpServiceContext.mo1291executionContext().executor()), streamingHttpRequest, streamingHttpResponseFactory);
            }

            @Override // io.servicetalk.http.api.StreamingHttpService, io.servicetalk.concurrent.api.AsyncCloseable
            public Completable closeAsync() {
                return streamingHttpService.closeAsync();
            }

            @Override // io.servicetalk.concurrent.api.AsyncCloseable
            public Completable closeAsyncGracefully() {
                return streamingHttpService.closeAsyncGracefully();
            }
        };
    }
}
