package reactor.aeron.client;

import java.nio.ByteBuffer;
import java.util.Optional;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.aeron.AeronInbound;
import reactor.aeron.AeronResources;
import reactor.aeron.ByteBufferFlux;
import reactor.aeron.DataMessageSubscriber;
import reactor.aeron.InnerPoller;
import reactor.aeron.MessageType;
import reactor.aeron.OnDisposable;
import reactor.core.publisher.Mono;

/* loaded from: input_file:reactor/aeron/client/AeronClientInbound.class */
final class AeronClientInbound implements AeronInbound, OnDisposable {
    private final String name;
    private final AeronResources resources;
    private volatile ByteBufferFlux flux;
    private volatile InnerPoller subscription;

    /* loaded from: input_file:reactor/aeron/client/AeronClientInbound$ClientDataMessageProcessor.class */
    private static class ClientDataMessageProcessor implements DataMessageSubscriber, Publisher<ByteBuffer> {
        private static final Logger logger = LoggerFactory.getLogger(ClientDataMessageProcessor.class);
        private final String category;
        private final long sessionId;
        private final Runnable onCompleteHandler;
        private volatile Subscription subscription;
        private volatile Subscriber<? super ByteBuffer> subscriber;

        private ClientDataMessageProcessor(String str, long j, Runnable runnable) {
            this.category = str;
            this.sessionId = j;
            this.onCompleteHandler = runnable;
        }

        @Override // reactor.aeron.DataMessageSubscriber
        public void onSubscription(Subscription subscription) {
            this.subscription = subscription;
        }

        @Override // reactor.aeron.DataMessageSubscriber
        public void onNext(long j, ByteBuffer byteBuffer) {
            if (j != this.sessionId) {
                throw new RuntimeException("Received " + MessageType.NEXT + " for unknown sessionId: " + j);
            }
            this.subscriber.onNext(byteBuffer);
        }

        @Override // reactor.aeron.DataMessageSubscriber
        public void onComplete(long j) {
            if (logger.isTraceEnabled()) {
                logger.trace("[{}] Received {} for sessionId: {}", new Object[]{this.category, MessageType.COMPLETE, Long.valueOf(j)});
            }
            if (this.sessionId == j) {
                this.onCompleteHandler.run();
            } else {
                logger.error("[{}] Received {} for unexpected sessionId: {}", new Object[]{this.category, MessageType.COMPLETE, Long.valueOf(j)});
            }
        }

        public void subscribe(Subscriber<? super ByteBuffer> subscriber) {
            this.subscriber = subscriber;
            subscriber.onSubscribe(this.subscription);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public AeronClientInbound(String str, AeronResources aeronResources) {
        this.name = str;
        this.resources = aeronResources;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Mono<Void> start(String str, int i, long j, Runnable runnable) {
        return Mono.defer(() -> {
            ClientDataMessageProcessor clientDataMessageProcessor = new ClientDataMessageProcessor(this.name, j, runnable);
            this.flux = new ByteBufferFlux(clientDataMessageProcessor);
            return this.resources.dataSubscription(this.name, str, i, clientDataMessageProcessor, this.resources.nextEventLoop(), null, image -> {
                Optional.ofNullable(runnable).ifPresent((v0) -> {
                    v0.run();
                });
            }).doOnSuccess(innerPoller -> {
                this.subscription = innerPoller;
                clientDataMessageProcessor.onSubscription(this.subscription);
            }).then().log("clientInbound");
        });
    }

    @Override // reactor.aeron.AeronInbound
    public ByteBufferFlux receive() {
        return this.flux;
    }

    public void dispose() {
        if (this.subscription != null) {
            this.subscription.dispose();
        }
    }

    @Override // reactor.aeron.OnDisposable
    public Mono<Void> onDispose() {
        return this.subscription != null ? this.subscription.onDispose() : Mono.empty();
    }

    public boolean isDisposed() {
        return this.subscription != null && this.subscription.isDisposed();
    }
}
