package reactor.aeron.mdc;

import io.aeron.Image;
import io.aeron.Subscription;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer;
import java.util.function.Function;
import org.agrona.CloseHelper;
import org.agrona.DirectBuffer;
import org.agrona.concurrent.Agent;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.aeron.AeronDuplex;
import reactor.aeron.AeronEventLoop;
import reactor.aeron.DefaultAeronDuplex;
import reactor.aeron.DefaultFragmentMapper;
import reactor.aeron.ImageAgent;
import reactor.aeron.OnDisposable;
import reactor.aeron.PublicationAgent;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoProcessor;

/* loaded from: input_file:reactor/aeron/mdc/AeronServerHandler.class */
final class AeronServerHandler implements OnDisposable {
    private static final Logger logger = LoggerFactory.getLogger(AeronServerHandler.class);
    private static final int STREAM_ID = -889323520;
    private final AeronOptions options;
    private final AeronResources resources;
    private final Function<? super AeronDuplex<DirectBuffer>, ? extends Publisher<Void>> handler;
    private volatile Subscription acceptorSubscription;
    private final DefaultFragmentMapper mapper = new DefaultFragmentMapper();
    private final Map<Integer, OnDisposable> connections = new ConcurrentHashMap();
    private final MonoProcessor<Void> dispose = MonoProcessor.create();
    private final MonoProcessor<Void> onDispose = MonoProcessor.create();

    /* JADX INFO: Access modifiers changed from: package-private */
    public AeronServerHandler(AeronOptions aeronOptions) {
        this.options = aeronOptions;
        this.resources = aeronOptions.resources();
        this.handler = aeronOptions.handler();
        this.dispose.then(doDispose()).doFinally(signalType -> {
            this.onDispose.onComplete();
        }).subscribe((Consumer) null, th -> {
            logger.warn("{} failed on doDispose(): {}", this, th.toString());
        }, () -> {
            logger.debug("Disposed {}", this);
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Mono<OnDisposable> start() {
        return Mono.defer(() -> {
            String asString = this.options.inboundUri().asString();
            logger.debug("Starting {} on: {}", this, asString);
            return this.resources.subscription(asString, STREAM_ID, this::onImageAvailable, this::onImageUnavailable).doOnSuccess(subscription -> {
                this.acceptorSubscription = subscription;
            }).thenReturn(this).doOnSuccess(aeronServerHandler -> {
                logger.debug("Started {} on: {}", this, asString);
            }).doOnError(th -> {
                logger.error("Failed to start {} on: {}", this, asString);
                dispose();
            });
        });
    }

    private void onImageAvailable(Image image) {
        int sessionId = image.sessionId();
        String asString = this.options.outboundUri().uri(channelUriStringBuilder -> {
            return channelUriStringBuilder.sessionId(Integer.valueOf(sessionId ^ Integer.MAX_VALUE));
        }).asString();
        logger.debug("{}: creating server connection: {}", Integer.toHexString(sessionId), asString);
        this.resources.publication(asString, STREAM_ID).map(publication -> {
            return new DefaultAeronDuplex(new ImageAgent(image, this.mapper, false), new PublicationAgent(publication));
        }).doOnSuccess(defaultAeronDuplex -> {
            if (this.handler == null) {
                logger.warn("{}: connection handler function is not specified", Integer.toHexString(sessionId));
            } else if (!defaultAeronDuplex.isDisposed()) {
                this.handler.apply(defaultAeronDuplex).subscribe(defaultAeronDuplex.disposeSubscriber());
            }
            AeronEventLoop nextEventLoop = this.resources.nextEventLoop();
            nextEventLoop.register((Agent) defaultAeronDuplex.inbound());
            nextEventLoop.register((Agent) defaultAeronDuplex.outbound());
            this.connections.put(Integer.valueOf(sessionId), defaultAeronDuplex);
            defaultAeronDuplex.onDispose(() -> {
                this.connections.remove(Integer.valueOf(sessionId));
            });
        }).doOnSuccess(defaultAeronDuplex2 -> {
            logger.debug("{}: created server connection: {}", Integer.toHexString(sessionId), asString);
        }).subscribe((Consumer) null, th -> {
            logger.warn("{}: failed to create server outbound, cause: {}", Integer.toHexString(sessionId), th.toString());
        });
    }

    private void onImageUnavailable(Image image) {
        logger.debug("{}: server inbound became unavailable", Integer.toHexString(image.sessionId()));
    }

    public void dispose() {
        this.dispose.onComplete();
    }

    public boolean isDisposed() {
        return this.onDispose.isDisposed();
    }

    @Override // reactor.aeron.OnDisposable
    public Mono<Void> onDispose() {
        return this.onDispose;
    }

    private Mono<Void> doDispose() {
        return Mono.fromRunnable(() -> {
            logger.debug("Disposing {}", this);
            CloseHelper.quietClose(this.acceptorSubscription);
            this.connections.forEach((num, onDisposable) -> {
                onDisposable.dispose();
            });
        });
    }

    public String toString() {
        return "AeronServerHandler" + Integer.toHexString(System.identityHashCode(this));
    }
}
