package io.servicetalk.http.netty;

import io.netty.channel.Channel;
import io.servicetalk.concurrent.Cancellable;
import io.servicetalk.concurrent.PublisherSource;
import io.servicetalk.concurrent.api.Completable;
import io.servicetalk.concurrent.api.Publisher;
import io.servicetalk.concurrent.api.Single;
import io.servicetalk.concurrent.api.SourceAdapters;
import io.servicetalk.concurrent.internal.ConcurrentUtils;
import io.servicetalk.concurrent.internal.SubscriberUtils;
import io.servicetalk.transport.api.ConnectionContext;
import io.servicetalk.transport.api.ConnectionInfo;
import io.servicetalk.transport.api.ExecutionContext;
import io.servicetalk.transport.api.SslConfig;
import io.servicetalk.transport.netty.internal.FlushStrategy;
import io.servicetalk.transport.netty.internal.NettyConnection;
import io.servicetalk.transport.netty.internal.NettyConnectionContext;
import io.servicetalk.transport.netty.internal.WriteDemandEstimator;
import io.servicetalk.transport.netty.internal.WriteDemandEstimators;
import io.servicetalk.utils.internal.PlatformDependent;
import java.net.SocketAddress;
import java.net.SocketOption;
import java.util.Objects;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.function.Supplier;
import javax.annotation.Nullable;
import javax.net.ssl.SSLSession;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:io/servicetalk/http/netty/NettyPipelinedConnection.class */
public final class NettyPipelinedConnection<Req, Resp> implements NettyConnectionContext {
    private static final AtomicIntegerFieldUpdater<NettyPipelinedConnection> writeQueueLockUpdater = AtomicIntegerFieldUpdater.newUpdater(NettyPipelinedConnection.class, "writeQueueLock");
    private static final AtomicIntegerFieldUpdater<NettyPipelinedConnection> readQueueLockUpdater = AtomicIntegerFieldUpdater.newUpdater(NettyPipelinedConnection.class, "readQueueLock");
    private static final int MAX_INIT_QUEUE_SIZE = 8;
    private final NettyConnection<Resp, Req> connection;
    private final Queue<NettyPipelinedConnection<Req, Resp>.WriteTask> writeQueue;
    private final Queue<PublisherSource.Subscriber<? super Resp>> readQueue;
    private volatile int writeQueueLock;
    private volatile int readQueueLock;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/servicetalk/http/netty/NettyPipelinedConnection$WriteTask.class */
    public final class WriteTask {
        private final PublisherSource.Subscriber<? super Resp> subscriber;
        private final Publisher<Req> requestPublisher;
        private final Supplier<FlushStrategy> flushStrategySupplier;
        private final Supplier<WriteDemandEstimator> writeDemandEstimatorSupplier;

        private WriteTask(PublisherSource.Subscriber<? super Resp> subscriber, Publisher<Req> publisher, Supplier<FlushStrategy> supplier, Supplier<WriteDemandEstimator> supplier2) {
            this.subscriber = subscriber;
            this.requestPublisher = publisher;
            this.flushStrategySupplier = supplier;
            this.writeDemandEstimatorSupplier = supplier2;
        }

        void run() {
            try {
                SourceAdapters.toSource(NettyPipelinedConnection.this.connection.write(this.requestPublisher, this.flushStrategySupplier, this.writeDemandEstimatorSupplier).afterFinally(() -> {
                    WriteTask writeTask = (WriteTask) NettyPipelinedConnection.this.pollWithLockAcquired(NettyPipelinedConnection.this.writeQueue, NettyPipelinedConnection.writeQueueLockUpdater);
                    if (writeTask != null) {
                        writeTask.run();
                    }
                }).mergeDelayError(new Publisher<Resp>() { // from class: io.servicetalk.http.netty.NettyPipelinedConnection.WriteTask.1
                    @Override // io.servicetalk.concurrent.api.Publisher
                    protected void handleSubscribe(PublisherSource.Subscriber<? super Resp> subscriber) {
                        try {
                            WriteTask.this.tryStartRead((PublisherSource.Subscriber) NettyPipelinedConnection.this.addAndTryPoll(NettyPipelinedConnection.this.readQueue, NettyPipelinedConnection.readQueueLockUpdater, subscriber));
                        } catch (Throwable th) {
                            NettyPipelinedConnection.this.closeConnection(subscriber, th);
                        }
                    }
                })).subscribe(this.subscriber);
            } catch (Throwable th) {
                NettyPipelinedConnection.this.handleWriteSetupError(this.subscriber, th);
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void tryStartRead(@Nullable PublisherSource.Subscriber<? super Resp> subscriber) {
            if (subscriber == null) {
                return;
            }
            try {
                SourceAdapters.toSource(NettyPipelinedConnection.this.connection.read().afterFinally(() -> {
                    tryStartRead((PublisherSource.Subscriber) NettyPipelinedConnection.this.pollWithLockAcquired(NettyPipelinedConnection.this.readQueue, NettyPipelinedConnection.readQueueLockUpdater));
                })).subscribe(subscriber);
            } catch (Throwable th) {
                NettyPipelinedConnection.this.handleReadSetupError(subscriber, th);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public NettyPipelinedConnection(NettyConnection<Resp, Req> nettyConnection, int i) {
        this.connection = (NettyConnection) Objects.requireNonNull(nettyConnection);
        this.writeQueue = PlatformDependent.newUnboundedMpscQueue(Math.min(i, 8));
        this.readQueue = PlatformDependent.newUnboundedMpscQueue(Math.min(i, 8));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Publisher<Resp> write(Publisher<Req> publisher) {
        NettyConnection<Resp, Req> nettyConnection = this.connection;
        nettyConnection.getClass();
        return write(publisher, nettyConnection::defaultFlushStrategy, WriteDemandEstimators::newDefaultEstimator);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Publisher<Resp> write(final Publisher<Req> publisher, final Supplier<FlushStrategy> supplier, final Supplier<WriteDemandEstimator> supplier2) {
        return new Publisher<Resp>() { // from class: io.servicetalk.http.netty.NettyPipelinedConnection.1
            @Override // io.servicetalk.concurrent.api.Publisher
            protected void handleSubscribe(PublisherSource.Subscriber<? super Resp> subscriber) {
                try {
                    WriteTask writeTask = (WriteTask) NettyPipelinedConnection.this.addAndTryPoll(NettyPipelinedConnection.this.writeQueue, NettyPipelinedConnection.writeQueueLockUpdater, new WriteTask(subscriber, publisher, supplier, supplier2));
                    if (writeTask != null) {
                        writeTask.run();
                    }
                } catch (Throwable th) {
                    NettyPipelinedConnection.this.closeConnection(subscriber, th);
                }
            }
        };
    }

    @Override // io.servicetalk.transport.api.ConnectionInfo
    public SocketAddress localAddress() {
        return this.connection.localAddress();
    }

    @Override // io.servicetalk.transport.api.ConnectionInfo
    public SocketAddress remoteAddress() {
        return this.connection.remoteAddress();
    }

    @Override // io.servicetalk.transport.api.ConnectionInfo
    @Nullable
    public SslConfig sslConfig() {
        return this.connection.sslConfig();
    }

    @Override // io.servicetalk.transport.api.ConnectionInfo
    @Nullable
    public SSLSession sslSession() {
        return this.connection.sslSession();
    }

    @Override // io.servicetalk.transport.api.ConnectionInfo, io.servicetalk.http.api.HttpConnectionContext
    /* renamed from: executionContext */
    public ExecutionContext<?> mo1296executionContext() {
        return this.connection.mo1296executionContext();
    }

    @Override // io.servicetalk.transport.api.ConnectionInfo
    @Nullable
    public <T> T socketOption(SocketOption<T> socketOption) {
        return (T) this.connection.socketOption(socketOption);
    }

    @Override // io.servicetalk.transport.api.ConnectionInfo, io.servicetalk.http.api.HttpConnectionContext
    public ConnectionInfo.Protocol protocol() {
        return this.connection.protocol();
    }

    @Override // io.servicetalk.transport.api.ConnectionContext
    @Nullable
    public ConnectionContext parent() {
        return this.connection.parent();
    }

    @Override // io.servicetalk.transport.netty.internal.NettyConnectionContext
    public Single<Throwable> transportError() {
        return this.connection.transportError();
    }

    @Override // io.servicetalk.transport.netty.internal.NettyConnectionContext, io.servicetalk.concurrent.api.ListenableAsyncCloseable
    public Completable onClosing() {
        return this.connection.onClosing();
    }

    @Override // io.servicetalk.concurrent.api.ListenableAsyncCloseable
    public Completable onClose() {
        return this.connection.onClose();
    }

    @Override // io.servicetalk.concurrent.api.AsyncCloseable
    public Completable closeAsync() {
        return this.connection.closeAsync();
    }

    @Override // io.servicetalk.concurrent.api.AsyncCloseable
    public Completable closeAsyncGracefully() {
        return this.connection.closeAsyncGracefully();
    }

    @Override // io.servicetalk.transport.netty.internal.NettyConnectionContext
    public Channel nettyChannel() {
        return this.connection.nettyChannel();
    }

    public String toString() {
        return this.connection.toString();
    }

    @Override // io.servicetalk.transport.netty.internal.NettyConnectionContext
    public Cancellable updateFlushStrategy(NettyConnectionContext.FlushStrategyProvider flushStrategyProvider) {
        return this.connection.updateFlushStrategy(flushStrategyProvider);
    }

    @Override // io.servicetalk.transport.netty.internal.NettyConnectionContext
    public FlushStrategy defaultFlushStrategy() {
        return this.connection.defaultFlushStrategy();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void closeConnection(PublisherSource.Subscriber<? super Resp> subscriber, Throwable th) {
        SourceAdapters.toSource(this.connection.closeAsync().concat(Publisher.failed(th))).subscribe(subscriber);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void handleWriteSetupError(PublisherSource.Subscriber<? super Resp> subscriber, Throwable th) {
        closeConnection(subscriber, th);
        while (true) {
            NettyPipelinedConnection<Req, Resp>.WriteTask poll = this.writeQueue.poll();
            if (poll != null) {
                SubscriberUtils.deliverErrorFromSource(((WriteTask) poll).subscriber, th);
            } else if (ConcurrentUtils.releaseLock(writeQueueLockUpdater, this) || !ConcurrentUtils.tryAcquireLock(writeQueueLockUpdater, this)) {
                return;
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void handleReadSetupError(PublisherSource.Subscriber<? super Resp> subscriber, Throwable th) {
        closeConnection(subscriber, th);
        while (true) {
            PublisherSource.Subscriber<? super Resp> poll = this.readQueue.poll();
            if (poll != null) {
                SubscriberUtils.deliverErrorFromSource(poll, th);
            } else if (ConcurrentUtils.releaseLock(readQueueLockUpdater, this) || !ConcurrentUtils.tryAcquireLock(readQueueLockUpdater, this)) {
                return;
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    @Nullable
    public <T> T addAndTryPoll(Queue<T> queue, AtomicIntegerFieldUpdater<NettyPipelinedConnection> atomicIntegerFieldUpdater, T t) {
        queue.add(t);
        while (ConcurrentUtils.tryAcquireLock(atomicIntegerFieldUpdater, this)) {
            T poll = queue.poll();
            if (poll != null) {
                return poll;
            }
            if (ConcurrentUtils.releaseLock(atomicIntegerFieldUpdater, this)) {
                return null;
            }
        }
        return null;
    }

    /* JADX INFO: Access modifiers changed from: private */
    @Nullable
    public <T> T pollWithLockAcquired(Queue<T> queue, AtomicIntegerFieldUpdater<NettyPipelinedConnection> atomicIntegerFieldUpdater) {
        do {
            try {
                T poll = queue.poll();
                if (poll != null) {
                    return poll;
                }
                if (ConcurrentUtils.releaseLock(atomicIntegerFieldUpdater, this)) {
                    return null;
                }
            } catch (Throwable th) {
                this.connection.closeAsync().subscribe();
                throw th;
            }
        } while (ConcurrentUtils.tryAcquireLock(atomicIntegerFieldUpdater, this));
        return null;
    }
}
