package reactor.aeron;

import java.util.Optional;
import java.util.function.Consumer;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoProcessor;

/* loaded from: input_file:reactor/aeron/DefaultAeronConnection.class */
public final class DefaultAeronConnection implements Connection {
    private final Logger logger;
    private final int sessionId;
    private final DefaultAeronInbound inbound;
    private final DefaultAeronOutbound outbound;
    private final MessagePublication publication;
    private final MessageSubscription subscription;
    private final MonoProcessor<Void> dispose;
    private final MonoProcessor<Void> onDispose;

    public DefaultAeronConnection(int i, DefaultAeronInbound defaultAeronInbound, DefaultAeronOutbound defaultAeronOutbound, MessagePublication messagePublication) {
        this(i, defaultAeronInbound, defaultAeronOutbound, null, messagePublication);
    }

    public DefaultAeronConnection(int i, DefaultAeronInbound defaultAeronInbound, DefaultAeronOutbound defaultAeronOutbound, MessageSubscription messageSubscription, MessagePublication messagePublication) {
        this.logger = LoggerFactory.getLogger(DefaultAeronConnection.class);
        this.dispose = MonoProcessor.create();
        this.onDispose = MonoProcessor.create();
        this.sessionId = i;
        this.inbound = defaultAeronInbound;
        this.outbound = defaultAeronOutbound;
        this.subscription = messageSubscription;
        this.publication = messagePublication;
        this.dispose.then(doDispose()).doFinally(signalType -> {
            this.onDispose.onComplete();
        }).subscribe((Consumer) null, th -> {
            this.logger.warn("{} failed on doDispose(): {}", this, th.toString());
        }, () -> {
            this.logger.debug("Disposed {}", this);
        });
    }

    @Override // reactor.aeron.Connection
    public AeronInbound inbound() {
        return this.inbound;
    }

    @Override // reactor.aeron.Connection
    public AeronOutbound outbound() {
        return this.outbound;
    }

    public void dispose() {
        this.dispose.onComplete();
    }

    public boolean isDisposed() {
        return this.onDispose.isDisposed();
    }

    @Override // reactor.aeron.OnDisposable
    public Mono<Void> onDispose() {
        return this.onDispose;
    }

    private Mono<Void> doDispose() {
        return Mono.defer(() -> {
            this.logger.debug("Disposing {}", this);
            DefaultAeronInbound defaultAeronInbound = this.inbound;
            defaultAeronInbound.getClass();
            MessagePublication messagePublication = this.publication;
            messagePublication.getClass();
            return Mono.whenDelayError(new Publisher[]{Mono.fromRunnable(defaultAeronInbound::dispose), (Publisher) Optional.ofNullable(this.subscription).map(messageSubscription -> {
                messageSubscription.getClass();
                return Mono.fromRunnable(messageSubscription::dispose).then(messageSubscription.onDispose());
            }).orElse(Mono.empty()), Mono.fromRunnable(messagePublication::dispose).then(this.publication.onDispose())});
        });
    }

    public String toString() {
        return "DefaultAeronConnection0x" + Integer.toHexString(this.sessionId);
    }
}
