package io.reactivex.netty.channel;

import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.util.AttributeKey;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.internal.EmptyArrays;
import io.reactivex.netty.channel.BackpressureManagingHandler;
import io.reactivex.netty.channel.events.ConnectionEventListener;
import io.reactivex.netty.events.EventPublisher;
import java.nio.channels.ClosedChannelException;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Producer;
import rx.Subscriber;
import rx.exceptions.MissingBackpressureException;

/* loaded from: input_file:io/reactivex/netty/channel/AbstractConnectionToChannelBridge.class */
public abstract class AbstractConnectionToChannelBridge<R, W> extends BackpressureManagingHandler {
    private static final Logger logger;
    private static final IllegalStateException ONLY_ONE_CONN_SUB_ALLOWED;
    private static final IllegalStateException ONLY_ONE_CONN_INPUT_SUB_ALLOWED;
    private static final IllegalStateException LAZY_CONN_INPUT_SUB;
    private static final ClosedChannelException CLOSED_CHANNEL_EXCEPTION;
    private final AttributeKey<ConnectionEventListener> eventListenerAttributeKey;
    private final AttributeKey<EventPublisher> eventPublisherAttributeKey;
    protected ConnectionEventListener eventListener;
    protected EventPublisher eventPublisher;
    private Subscriber<? super Channel> newChannelSub;
    private ReadProducer<R> readProducer;
    private boolean raiseErrorOnInputSubscription;
    private boolean connectionEmitted;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/reactivex/netty/channel/AbstractConnectionToChannelBridge$ReadProducer.class */
    public static final class ReadProducer<T> extends BackpressureManagingHandler.RequestReadIfRequiredEvent implements Producer {
        private static final AtomicLongFieldUpdater<ReadProducer> REQUEST_UPDATER = AtomicLongFieldUpdater.newUpdater(ReadProducer.class, "requested");
        private volatile long requested;
        private final Subscriber<? super T> subscriber;
        private final Channel channel;

        ReadProducer(Subscriber<? super T> subscriber, Channel channel) {
            this.subscriber = subscriber;
            this.channel = channel;
        }

        @Override // rx.Producer
        public void request(long j) {
            long j2;
            long j3;
            if (Long.MAX_VALUE != this.requested) {
                if (Long.MAX_VALUE == j) {
                    REQUEST_UPDATER.set(this, Long.MAX_VALUE);
                }
                do {
                    j2 = this.requested;
                    j3 = j2 + j;
                    if (j3 < 0) {
                        j3 = Long.MAX_VALUE;
                    }
                } while (!REQUEST_UPDATER.compareAndSet(this, j2, j3));
            }
            if (this.channel.config().isAutoRead()) {
                return;
            }
            this.channel.pipeline().fireUserEventTriggered(this);
        }

        public void sendOnError(Throwable th) {
            this.subscriber.onError(th);
        }

        public void sendOnComplete() {
            this.subscriber.onCompleted();
        }

        public void sendOnNext(T t) {
            if (this.requested <= 0) {
                this.subscriber.onError(new MissingBackpressureException("Received more data on the channel than demanded by the subscriber."));
                return;
            }
            if (REQUEST_UPDATER.get(this) != Long.MAX_VALUE) {
                REQUEST_UPDATER.decrementAndGet(this);
            }
            this.subscriber.onNext(t);
        }

        @Override // io.reactivex.netty.channel.BackpressureManagingHandler.RequestReadIfRequiredEvent
        protected boolean shouldReadMore(ChannelHandlerContext channelHandlerContext) {
            return !this.subscriber.isUnsubscribed() && REQUEST_UPDATER.get(this) > 0;
        }

        long getRequested() {
            return this.requested;
        }

        public String toString() {
            return "ReadProducer{requested=" + this.requested + '}';
        }
    }

    protected AbstractConnectionToChannelBridge(String str, ConnectionEventListener connectionEventListener, EventPublisher eventPublisher) {
        super(str);
        if (null == connectionEventListener) {
            throw new IllegalArgumentException("Event listener can not be null.");
        }
        if (null == eventPublisher) {
            throw new IllegalArgumentException("Event publisher can not be null.");
        }
        this.eventListener = connectionEventListener;
        this.eventPublisher = eventPublisher;
        this.eventListenerAttributeKey = null;
        this.eventPublisherAttributeKey = null;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public AbstractConnectionToChannelBridge(String str, AttributeKey<ConnectionEventListener> attributeKey, AttributeKey<EventPublisher> attributeKey2) {
        super(str);
        this.eventListenerAttributeKey = attributeKey;
        this.eventPublisherAttributeKey = attributeKey2;
    }

    @Override // io.reactivex.netty.channel.BackpressureManagingHandler, io.netty.channel.ChannelHandlerAdapter, io.netty.channel.ChannelHandler
    public void handlerAdded(ChannelHandlerContext channelHandlerContext) throws Exception {
        if (null == this.eventListener && null == this.eventPublisher) {
            this.eventListener = (ConnectionEventListener) channelHandlerContext.channel().attr(this.eventListenerAttributeKey).get();
            this.eventPublisher = (EventPublisher) channelHandlerContext.channel().attr(this.eventPublisherAttributeKey).get();
        }
        if (null == this.eventPublisher) {
            logger.error("No Event publisher bound to the channel, closing channel.");
            channelHandlerContext.channel().close();
        } else if (this.eventPublisher.publishingEnabled() && null == this.eventListener) {
            logger.error("No Event listener bound to the channel and publising is enabled, closing channel.");
            channelHandlerContext.channel().close();
        } else {
            channelHandlerContext.pipeline().addFirst(new BytesInspector(this.eventPublisher, this.eventListener));
            super.handlerAdded(channelHandlerContext);
        }
    }

    @Override // io.netty.channel.ChannelInboundHandlerAdapter, io.netty.channel.ChannelInboundHandler
    public void channelInactive(ChannelHandlerContext channelHandlerContext) throws Exception {
        if (!this.connectionEmitted && isValidToEmit(this.newChannelSub)) {
            emitNewConnection(channelHandlerContext.channel());
            this.connectionEmitted = true;
        }
        super.channelInactive(channelHandlerContext);
    }

    @Override // io.netty.channel.ChannelInboundHandlerAdapter, io.netty.channel.ChannelInboundHandler
    public void channelUnregistered(ChannelHandlerContext channelHandlerContext) throws Exception {
        if (isValidToEmitToReadSubscriber(this.readProducer)) {
            this.readProducer.sendOnError(CLOSED_CHANNEL_EXCEPTION);
        }
        super.channelUnregistered(channelHandlerContext);
    }

    @Override // io.reactivex.netty.channel.BackpressureManagingHandler, io.netty.channel.ChannelInboundHandlerAdapter, io.netty.channel.ChannelInboundHandler
    public void userEventTriggered(ChannelHandlerContext channelHandlerContext, Object obj) throws Exception {
        if (obj instanceof EmitConnectionEvent) {
            if (!this.connectionEmitted) {
                emitNewConnection(channelHandlerContext.channel());
                this.connectionEmitted = true;
            }
        } else if (obj instanceof ConnectionCreationFailedEvent) {
            if (isValidToEmit(this.newChannelSub)) {
                this.newChannelSub.onError(((ConnectionCreationFailedEvent) obj).getThrowable());
            }
        } else if (obj instanceof ChannelSubscriberEvent) {
            newConnectionSubscriber((ChannelSubscriberEvent) obj);
        } else if (obj instanceof ConnectionInputSubscriberEvent) {
            newConnectionInputSubscriber(channelHandlerContext.channel(), ((ConnectionInputSubscriberEvent) obj).getSubscriber(), false);
        } else if (obj instanceof ConnectionInputSubscriberResetEvent) {
            resetConnectionInputSubscriber();
        } else if (obj instanceof ConnectionInputSubscriberReplaceEvent) {
            replaceConnectionInputSubscriber(channelHandlerContext.channel(), (ConnectionInputSubscriberReplaceEvent) obj);
        }
        super.userEventTriggered(channelHandlerContext, obj);
    }

    @Override // io.reactivex.netty.channel.BackpressureManagingHandler
    public void newMessage(ChannelHandlerContext channelHandlerContext, Object obj) {
        if (!isValidToEmitToReadSubscriber(this.readProducer)) {
            if (logger.isWarnEnabled()) {
                logger.warn("Data received on channel, but no subscriber registered. Discarding data. Message class: " + obj.getClass().getName() + ", channel: " + channelHandlerContext.channel());
            }
            ReferenceCountUtil.release(obj);
        } else {
            try {
                this.readProducer.sendOnNext(obj);
            } catch (ClassCastException e) {
                ReferenceCountUtil.release(obj);
                this.readProducer.sendOnError(e);
            }
        }
    }

    @Override // io.reactivex.netty.channel.BackpressureManagingHandler
    public boolean shouldReadMore(ChannelHandlerContext channelHandlerContext) {
        return null != this.readProducer && this.readProducer.shouldReadMore(channelHandlerContext);
    }

    @Override // io.netty.channel.ChannelInboundHandlerAdapter, io.netty.channel.ChannelHandlerAdapter, io.netty.channel.ChannelHandler, io.netty.channel.ChannelInboundHandler
    public void exceptionCaught(ChannelHandlerContext channelHandlerContext, Throwable th) throws Exception {
        if (!this.connectionEmitted && isValidToEmit(this.newChannelSub)) {
            this.newChannelSub.onError(th);
        } else if (isValidToEmitToReadSubscriber(this.readProducer)) {
            this.readProducer.sendOnError(th);
        } else {
            logger.info("Exception in the pipeline and none of the subscribers are active.", th);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public static boolean isValidToEmit(Subscriber<?> subscriber) {
        return (null == subscriber || subscriber.isUnsubscribed()) ? false : true;
    }

    private static boolean isValidToEmitToReadSubscriber(ReadProducer<?> readProducer) {
        return (null == readProducer || ((ReadProducer) readProducer).subscriber.isUnsubscribed()) ? false : true;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public boolean connectionInputSubscriberExists(Channel channel) {
        if ($assertionsDisabled || channel.eventLoop().inEventLoop()) {
            return (null == this.readProducer || null == ((ReadProducer) this.readProducer).subscriber || ((ReadProducer) this.readProducer).subscriber.isUnsubscribed()) ? false : true;
        }
        throw new AssertionError();
    }

    protected void onNewReadSubscriber(Subscriber<? super R> subscriber) {
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public final void checkEagerSubscriptionIfConfigured(Channel channel) {
        if (channel.config().isAutoRead() && null == this.readProducer) {
            this.raiseErrorOnInputSubscription = true;
            Subscriber<? super R> subscriber = ConnectionInputSubscriberEvent.discardAllInput().getSubscriber();
            ReadProducer<R> readProducer = new ReadProducer<>(subscriber, channel);
            subscriber.setProducer(readProducer);
            this.readProducer = readProducer;
        }
    }

    protected final Subscriber<? super Channel> getNewChannelSub() {
        return this.newChannelSub;
    }

    private void emitNewConnection(Channel channel) {
        if (!isValidToEmit(this.newChannelSub)) {
            channel.close();
            return;
        }
        try {
            this.newChannelSub.onNext(channel);
            this.connectionEmitted = true;
            checkEagerSubscriptionIfConfigured(channel);
            this.newChannelSub.onCompleted();
        } catch (Exception e) {
            logger.error("Error emitting a new connection. Closing this channel.", (Throwable) e);
            channel.close();
        }
    }

    private void resetConnectionInputSubscriber() {
        Subscriber subscriber = null == this.readProducer ? null : ((ReadProducer) this.readProducer).subscriber;
        if (isValidToEmit(subscriber)) {
            subscriber.onCompleted();
        }
        this.raiseErrorOnInputSubscription = false;
        this.readProducer = null;
    }

    private void newConnectionInputSubscriber(Channel channel, Subscriber<? super R> subscriber, boolean z) {
        Subscriber subscriber2 = null == this.readProducer ? null : ((ReadProducer) this.readProducer).subscriber;
        if (!isValidToEmit(subscriber2)) {
            if (this.raiseErrorOnInputSubscription) {
                subscriber.onError(LAZY_CONN_INPUT_SUB);
                return;
            } else {
                setNewReadProducer(channel, subscriber);
                return;
            }
        }
        if (!z) {
            subscriber.onError(ONLY_ONE_CONN_INPUT_SUB_ALLOWED);
        } else {
            setNewReadProducer(channel, subscriber);
            subscriber2.onCompleted();
        }
    }

    private void setNewReadProducer(Channel channel, Subscriber<? super R> subscriber) {
        ReadProducer<R> readProducer = new ReadProducer<>(subscriber, channel);
        subscriber.setProducer(readProducer);
        onNewReadSubscriber(subscriber);
        this.readProducer = readProducer;
    }

    private void replaceConnectionInputSubscriber(Channel channel, ConnectionInputSubscriberReplaceEvent<R, W> connectionInputSubscriberReplaceEvent) {
        newConnectionInputSubscriber(channel, connectionInputSubscriberReplaceEvent.getNewSubEvent().getSubscriber(), true);
    }

    private void newConnectionSubscriber(ChannelSubscriberEvent<R, W> channelSubscriberEvent) {
        if (null == this.newChannelSub) {
            this.newChannelSub = channelSubscriberEvent.getSubscriber();
        } else {
            channelSubscriberEvent.getSubscriber().onError(ONLY_ONE_CONN_SUB_ALLOWED);
        }
    }

    static {
        $assertionsDisabled = !AbstractConnectionToChannelBridge.class.desiredAssertionStatus();
        logger = LoggerFactory.getLogger(AbstractConnectionToChannelBridge.class);
        ONLY_ONE_CONN_SUB_ALLOWED = new IllegalStateException("Only one subscriber allowed for connection observable.");
        ONLY_ONE_CONN_INPUT_SUB_ALLOWED = new IllegalStateException("Only one subscriber allowed for connection input.");
        LAZY_CONN_INPUT_SUB = new IllegalStateException("Channel is set to auto-read but the subscription was lazy.");
        CLOSED_CHANNEL_EXCEPTION = new ClosedChannelException();
        ONLY_ONE_CONN_INPUT_SUB_ALLOWED.setStackTrace(EmptyArrays.EMPTY_STACK_TRACE);
        ONLY_ONE_CONN_SUB_ALLOWED.setStackTrace(EmptyArrays.EMPTY_STACK_TRACE);
        LAZY_CONN_INPUT_SUB.setStackTrace(EmptyArrays.EMPTY_STACK_TRACE);
        CLOSED_CHANNEL_EXCEPTION.setStackTrace(EmptyArrays.EMPTY_STACK_TRACE);
    }
}
