package reactor.aeron.mdc;

import io.aeron.Publication;
import java.time.Duration;
import java.util.function.Function;
import java.util.function.Supplier;
import org.agrona.CloseHelper;
import org.agrona.DirectBuffer;
import org.agrona.concurrent.Agent;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.aeron.AeronDuplex;
import reactor.aeron.AeronEventLoop;
import reactor.aeron.DefaultAeronDuplex;
import reactor.aeron.DefaultFragmentMapper;
import reactor.aeron.ImageAgent;
import reactor.aeron.PublicationAgent;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoProcessor;

/* loaded from: input_file:reactor/aeron/mdc/AeronClientConnector.class */
final class AeronClientConnector {
    private static final Logger logger = LoggerFactory.getLogger(AeronClientConnector.class);
    private static final int STREAM_ID = -889323520;
    private final AeronOptions options;
    private final AeronResources resources;
    private final Function<? super AeronDuplex<DirectBuffer>, ? extends Publisher<Void>> handler;
    private final DefaultFragmentMapper mapper = new DefaultFragmentMapper();

    /* JADX INFO: Access modifiers changed from: package-private */
    public AeronClientConnector(AeronOptions aeronOptions) {
        this.options = aeronOptions;
        this.resources = aeronOptions.resources();
        this.handler = aeronOptions.handler();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Mono<AeronDuplex<DirectBuffer>> start() {
        return Mono.defer(() -> {
            return tryConnect().flatMap(publication -> {
                int sessionId = publication.sessionId();
                String asString = this.options.inboundUri().uri(channelUriStringBuilder -> {
                    return channelUriStringBuilder.sessionId(Integer.valueOf(sessionId ^ Integer.MAX_VALUE));
                }).asString();
                logger.debug("{}: creating client connection: {}", Integer.toHexString(sessionId), asString);
                MonoProcessor create = MonoProcessor.create();
                return this.resources.subscription(asString, STREAM_ID, image -> {
                    logger.debug("{}: created client inbound", Integer.toHexString(sessionId));
                    create.onNext(image);
                }, image2 -> {
                    logger.debug("{}: client inbound became unavaliable", Integer.toHexString(sessionId));
                }).doOnError(th -> {
                    logger.warn("{}: failed to create client inbound, cause: {}", Integer.toHexString(sessionId), th.toString());
                    CloseHelper.quietClose(publication);
                }).flatMap(subscription -> {
                    return create;
                }).map(image3 -> {
                    return new DefaultAeronDuplex(new ImageAgent(image3, this.mapper, true), new PublicationAgent(publication));
                }).doOnSuccess(defaultAeronDuplex -> {
                    if (this.handler == null) {
                        logger.warn("{}: connection handler function is not specified", Integer.toHexString(sessionId));
                    } else if (!defaultAeronDuplex.isDisposed()) {
                        this.handler.apply(defaultAeronDuplex).subscribe(defaultAeronDuplex.disposeSubscriber());
                    }
                    AeronEventLoop nextEventLoop = this.resources.nextEventLoop();
                    nextEventLoop.register((Agent) defaultAeronDuplex.inbound());
                    nextEventLoop.register((Agent) defaultAeronDuplex.outbound());
                }).doOnSuccess(defaultAeronDuplex2 -> {
                    logger.debug("{}: created client connection: {}", Integer.toHexString(sessionId), asString);
                });
            });
        });
    }

    private Mono<Publication> tryConnect() {
        return Mono.defer(() -> {
            return Mono.fromCallable(this::getOutboundChannel).flatMap(str -> {
                return this.resources.publication(str, STREAM_ID);
            }).flatMap(publication -> {
                return ensureConnected(publication).doOnError(th -> {
                    CloseHelper.quietClose(publication);
                });
            }).retryBackoff(this.options.connectRetryCount(), Duration.ZERO, this.options.connectTimeout()).doOnError(th -> {
                logger.warn("aeron.Publication is not connected after several retries");
            });
        });
    }

    private Mono<Publication> ensureConnected(Publication publication) {
        return Mono.defer(() -> {
            Duration connectTimeout = this.options.connectTimeout();
            Duration ofMillis = Duration.ofMillis(100L);
            return ensureConnected0(publication).retryBackoff(Math.max(connectTimeout.toMillis() / ofMillis.toMillis(), 1L), ofMillis, ofMillis).doOnError(th -> {
                logger.warn("aeron.Publication is not connected after several retries");
            }).thenReturn(publication);
        });
    }

    private Mono<Void> ensureConnected0(Publication publication) {
        return Mono.defer(() -> {
            return publication.isConnected() ? Mono.empty() : Mono.error(AeronExceptions.failWithPublication("aeron.Publication is not connected"));
        });
    }

    private String getOutboundChannel() {
        AeronChannelUriString outboundUri = this.options.outboundUri();
        Supplier<Integer> sessionIdGenerator = this.options.sessionIdGenerator();
        return (sessionIdGenerator == null || outboundUri.builder().sessionId() != null) ? outboundUri.asString() : outboundUri.uri(channelUriStringBuilder -> {
            return channelUriStringBuilder.sessionId((Integer) sessionIdGenerator.get());
        }).asString();
    }
}
