package reactor.aeron.server;

import io.aeron.Subscription;
import java.nio.ByteBuffer;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import reactor.aeron.AeronInbound;
import reactor.aeron.AeronResources;
import reactor.aeron.ByteBufferFlux;
import reactor.aeron.DataMessageSubscriber;
import reactor.aeron.MessageType;
import reactor.core.Disposable;
import reactor.core.publisher.Mono;
import reactor.core.publisher.TopicProcessor;
import reactor.util.Logger;
import reactor.util.Loggers;

/* loaded from: input_file:reactor/aeron/server/AeronServerInbound.class */
final class AeronServerInbound implements AeronInbound, Disposable {
    private final TopicProcessor<ByteBuffer> processor;
    private final ByteBufferFlux flux;
    private final AeronResources aeronResources;
    private Subscription serverDataSubscription;
    private ServerDataMessageProcessor messageProcessor;

    /* loaded from: input_file:reactor/aeron/server/AeronServerInbound$ServerDataMessageProcessor.class */
    static class ServerDataMessageProcessor implements DataMessageSubscriber, Publisher<ByteBuffer> {
        private static final Logger logger = Loggers.getLogger(ServerDataMessageProcessor.class);
        private final String category;
        private volatile org.reactivestreams.Subscription subscription;
        private volatile long lastSignalTimeNs;
        private volatile Subscriber<? super ByteBuffer> subscriber;
        private final long sessionId;
        private final Runnable onCompleteHandler;

        ServerDataMessageProcessor(String str, long j, Runnable runnable) {
            this.category = str;
            this.sessionId = j;
            this.onCompleteHandler = runnable;
        }

        @Override // reactor.aeron.PollerSubscriber
        public void onSubscribe(org.reactivestreams.Subscription subscription) {
            this.subscription = subscription;
        }

        @Override // reactor.aeron.DataMessageSubscriber
        public void onNext(long j, ByteBuffer byteBuffer) {
            if (logger.isTraceEnabled()) {
                logger.trace("[{}] Received {} for sessionId: {}, buffer: {}", new Object[]{this.category, MessageType.NEXT, Long.valueOf(j), byteBuffer});
            }
            this.lastSignalTimeNs = System.nanoTime();
            if (this.sessionId == j) {
                this.subscriber.onNext(byteBuffer);
            } else {
                logger.error("[{}] Received {} for unexpected sessionId: {}", new Object[]{this.category, MessageType.NEXT, Long.valueOf(j)});
            }
        }

        @Override // reactor.aeron.DataMessageSubscriber
        public void onComplete(long j) {
            if (logger.isTraceEnabled()) {
                logger.trace("[{}] Received {} for sessionId: {}", new Object[]{this.category, MessageType.COMPLETE, Long.valueOf(j)});
            }
            if (this.sessionId == j) {
                this.onCompleteHandler.run();
            } else {
                logger.error("[{}] Received {} for unexpected sessionId: {}", new Object[]{this.category, MessageType.COMPLETE, Long.valueOf(j)});
            }
        }

        public void subscribe(Subscriber<? super ByteBuffer> subscriber) {
            this.subscriber = subscriber;
            subscriber.onSubscribe(this.subscription);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public AeronServerInbound(String str, AeronResources aeronResources) {
        this.processor = TopicProcessor.builder().name(str).build();
        this.aeronResources = aeronResources;
        this.flux = new ByteBufferFlux(this.processor);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Mono<Void> initialise(String str, String str2, int i, long j, Runnable runnable) {
        return Mono.fromRunnable(() -> {
            this.messageProcessor = new ServerDataMessageProcessor(str, j, runnable);
            this.serverDataSubscription = this.aeronResources.dataSubscription(str, str2, i, "to receive client data on", j, this.messageProcessor);
            this.messageProcessor.subscribe(this.processor);
        });
    }

    @Override // reactor.aeron.AeronInbound
    public ByteBufferFlux receive() {
        return this.flux;
    }

    public void dispose() {
        this.processor.onComplete();
        this.aeronResources.close(this.serverDataSubscription);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public long lastSignalTimeNs() {
        return this.messageProcessor.lastSignalTimeNs;
    }
}
