package reactor.aeron;

import io.aeron.Image;
import java.util.ArrayList;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Stream;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoProcessor;

/* loaded from: input_file:reactor/aeron/AeronServerHandler.class */
final class AeronServerHandler implements OnDisposable {
    private static final Logger logger = LoggerFactory.getLogger(AeronServerHandler.class);
    private static final int STREAM_ID = -889323520;
    private final AeronOptions options;
    private final AeronResources resources;
    private final Function<? super AeronConnection, ? extends Publisher<Void>> handler;
    private volatile MessageSubscription acceptorSubscription;
    private final Map<Integer, MonoProcessor<Void>> disposeHooks = new ConcurrentHashMap();
    private final MonoProcessor<Void> dispose = MonoProcessor.create();
    private final MonoProcessor<Void> onDispose = MonoProcessor.create();

    /* JADX INFO: Access modifiers changed from: package-private */
    public AeronServerHandler(AeronOptions aeronOptions) {
        this.options = aeronOptions;
        this.resources = aeronOptions.resources();
        this.handler = aeronOptions.handler();
        this.dispose.then(doDispose()).doFinally(signalType -> {
            this.onDispose.onComplete();
        }).subscribe((Consumer) null, th -> {
            logger.warn("{} failed on doDispose(): {}", this, th.toString());
        }, () -> {
            logger.debug("Disposed {}", this);
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Mono<OnDisposable> start() {
        return Mono.defer(() -> {
            String asString = this.options.inboundUri().asString();
            logger.debug("Starting {} on: {}", this, asString);
            return this.resources.subscription(asString, STREAM_ID, this::onImageAvailable, this::onImageUnavailable).doOnSuccess(messageSubscription -> {
                this.acceptorSubscription = messageSubscription;
            }).thenReturn(this).doOnSuccess(aeronServerHandler -> {
                logger.debug("Started {} on: {}", this, asString);
            }).doOnError(th -> {
                logger.error("Failed to start {} on: {}", this, asString);
                dispose();
            });
        });
    }

    private void onImageAvailable(Image image) {
        int sessionId = image.sessionId();
        String asString = this.options.outboundUri().uri(channelUriStringBuilder -> {
            return channelUriStringBuilder.sessionId(Integer.valueOf(sessionId));
        }).asString();
        logger.debug("{}: creating server connection: {}", Integer.toHexString(sessionId), asString);
        this.resources.publication(asString, STREAM_ID, this.options).flatMap(messagePublication -> {
            return this.resources.inbound(image, null).doOnError(th -> {
                messagePublication.dispose();
            }).flatMap(defaultAeronInbound -> {
                return newConnection(sessionId, messagePublication, defaultAeronInbound);
            });
        }).doOnSuccess(aeronConnection -> {
            logger.debug("{}: created server connection: {}", Integer.toHexString(sessionId), asString);
        }).subscribe((Consumer) null, th -> {
            logger.warn("{}: failed to create server outbound, cause: {}", Integer.toHexString(sessionId), th.toString());
        });
    }

    private Mono<? extends AeronConnection> newConnection(int i, MessagePublication messagePublication, DefaultAeronInbound defaultAeronInbound) {
        MonoProcessor<Void> create = MonoProcessor.create();
        this.disposeHooks.put(Integer.valueOf(i), create);
        DuplexAeronConnection duplexAeronConnection = new DuplexAeronConnection(i, defaultAeronInbound, new DefaultAeronOutbound(messagePublication), create);
        return duplexAeronConnection.start(this.handler).doOnError(th -> {
            duplexAeronConnection.dispose();
            this.disposeHooks.remove(Integer.valueOf(i));
        });
    }

    private void onImageUnavailable(Image image) {
        int sessionId = image.sessionId();
        MonoProcessor<Void> remove = this.disposeHooks.remove(Integer.valueOf(sessionId));
        if (remove != null) {
            logger.debug("{}: server inbound became unavailable", Integer.toHexString(sessionId));
            remove.onComplete();
        }
    }

    public void dispose() {
        this.dispose.onComplete();
    }

    public boolean isDisposed() {
        return this.onDispose.isDisposed();
    }

    @Override // reactor.aeron.OnDisposable
    public Mono<Void> onDispose() {
        return this.onDispose;
    }

    private Mono<Void> doDispose() {
        return Mono.defer(() -> {
            logger.debug("Disposing {}", this);
            ArrayList arrayList = new ArrayList();
            arrayList.add(Optional.ofNullable(this.acceptorSubscription).map(messageSubscription -> {
                messageSubscription.getClass();
                return Mono.fromRunnable(messageSubscription::dispose).then(messageSubscription.onDispose());
            }).orElse(Mono.empty()));
            Stream<MonoProcessor<Void>> peek = this.disposeHooks.values().stream().peek((v0) -> {
                v0.onComplete();
            });
            arrayList.getClass();
            peek.forEach((v1) -> {
                r1.add(v1);
            });
            return Mono.whenDelayError(arrayList).doFinally(signalType -> {
                this.disposeHooks.clear();
            });
        });
    }

    public String toString() {
        return "AeronServerHandler" + Integer.toHexString(System.identityHashCode(this));
    }
}
