package reactor.aeron;

import java.nio.ByteBuffer;
import java.util.Optional;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;

/* loaded from: input_file:reactor/aeron/DefaultAeronInbound.class */
public final class DefaultAeronInbound implements AeronInbound, OnDisposable {
    private final String name;
    private final AeronResources resources;
    private volatile ByteBufferFlux flux;
    private volatile MessageSubscription subscription;

    /* loaded from: input_file:reactor/aeron/DefaultAeronInbound$DataMessageProcessor.class */
    private static class DataMessageProcessor implements DataMessageSubscriber, Publisher<ByteBuffer> {
        private static final Logger logger = LoggerFactory.getLogger(DataMessageProcessor.class);
        private final String category;
        private final long sessionId;
        private volatile Subscription subscription;
        private volatile Subscriber<? super ByteBuffer> subscriber;

        private DataMessageProcessor(String str, long j) {
            this.category = str;
            this.sessionId = j;
        }

        @Override // reactor.aeron.DataMessageSubscriber
        public void onSubscription(Subscription subscription) {
            this.subscription = subscription;
        }

        @Override // reactor.aeron.DataMessageSubscriber
        public void onNext(ByteBuffer byteBuffer) {
            if (logger.isTraceEnabled()) {
                logger.trace("[{}] Received NEXT for sessionId: {}, buffer: {}", new Object[]{this.category, Long.valueOf(this.sessionId), byteBuffer});
            }
            this.subscriber.onNext(byteBuffer);
        }

        public void subscribe(Subscriber<? super ByteBuffer> subscriber) {
            this.subscriber = subscriber;
            subscriber.onSubscribe(this.subscription);
        }
    }

    public DefaultAeronInbound(String str, AeronResources aeronResources) {
        this.name = str;
        this.resources = aeronResources;
    }

    public Mono<Void> start(String str, int i, long j, Runnable runnable) {
        return Mono.defer(() -> {
            DataMessageProcessor dataMessageProcessor = new DataMessageProcessor(this.name, j);
            this.flux = new ByteBufferFlux(dataMessageProcessor);
            return this.resources.dataSubscription(this.name, str, i, dataMessageProcessor, this.resources.nextEventLoop(), null, image -> {
                Optional.ofNullable(runnable).ifPresent((v0) -> {
                    v0.run();
                });
            }).doOnSuccess(messageSubscription -> {
                this.subscription = messageSubscription;
                dataMessageProcessor.onSubscription(this.subscription);
            }).then();
        });
    }

    @Override // reactor.aeron.AeronInbound
    public ByteBufferFlux receive() {
        return this.flux;
    }

    public void dispose() {
        if (this.subscription != null) {
            this.subscription.dispose();
        }
    }

    @Override // reactor.aeron.OnDisposable
    public Mono<Void> onDispose() {
        return this.subscription != null ? this.subscription.onDispose() : Mono.empty();
    }

    public boolean isDisposed() {
        return this.subscription != null && this.subscription.isDisposed();
    }
}
