package reactor.ipc.aeron.client;

import io.aeron.Subscription;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import org.reactivestreams.Publisher;
import reactor.core.Disposable;
import reactor.core.publisher.Mono;
import reactor.ipc.aeron.AeronConnector;
import reactor.ipc.aeron.AeronInbound;
import reactor.ipc.aeron.AeronOutbound;
import reactor.ipc.aeron.AeronWrapper;
import reactor.ipc.aeron.DefaultAeronOutbound;
import reactor.ipc.aeron.HeartbeatSender;
import reactor.ipc.aeron.HeartbeatWatchdog;
import reactor.ipc.aeron.Pooler;
import reactor.util.Logger;
import reactor.util.Loggers;

/* loaded from: input_file:reactor/ipc/aeron/client/AeronClient.class */
public final class AeronClient implements AeronConnector, Disposable {
    private final AeronClientOptions options;
    private final String name;
    private final Pooler pooler;
    private final AeronWrapper wrapper;
    private final ClientControlMessageSubscriber controlMessageSubscriber;
    private final Subscription controlSubscription;
    private final int clientControlStreamId;
    private final HeartbeatSender heartbeatSender;
    private final List<ClientHandler> handlers = new CopyOnWriteArrayList();
    private final HeartbeatWatchdog heartbeatWatchdog;
    private static final Logger logger = Loggers.getLogger(AeronClient.class);
    private static final AtomicInteger streamIdCounter = new AtomicInteger();

    /* loaded from: input_file:reactor/ipc/aeron/client/AeronClient$ClientHandler.class */
    class ClientHandler implements Disposable {
        private final BiFunction<? super AeronInbound, ? super AeronOutbound, ? extends Publisher<Void>> ioHandler;
        private final DefaultAeronOutbound outbound;
        private final int clientSessionStreamId = AeronClient.streamIdCounter.incrementAndGet();
        private final ClientConnector connector;
        private volatile AeronClientInbound inbound;
        private volatile long sessionId;

        ClientHandler(BiFunction<? super AeronInbound, ? super AeronOutbound, ? extends Publisher<Void>> biFunction) {
            this.ioHandler = biFunction;
            this.outbound = new DefaultAeronOutbound(AeronClient.this.name, AeronClient.this.wrapper, AeronClient.this.options.serverChannel(), AeronClient.this.options);
            this.connector = new ClientConnector(AeronClient.this.name, AeronClient.this.wrapper, AeronClient.this.options, AeronClient.this.controlMessageSubscriber, AeronClient.this.heartbeatSender, AeronClient.this.clientControlStreamId, this.clientSessionStreamId);
        }

        Mono<Disposable> initialise() {
            AeronClient.this.handlers.add(this);
            return this.connector.connect().flatMap(connectAckResponse -> {
                this.inbound = new AeronClientInbound(AeronClient.this.pooler, AeronClient.this.wrapper, AeronClient.this.options.clientChannel(), this.clientSessionStreamId, connectAckResponse.sessionId);
                this.sessionId = connectAckResponse.sessionId;
                AeronClient.this.heartbeatWatchdog.add(connectAckResponse.sessionId, this::dispose, () -> {
                    return this.inbound.getLastSignalTimeNs();
                });
                return this.outbound.initialise(connectAckResponse.sessionId, connectAckResponse.serverSessionStreamId);
            }).doOnSuccess(r5 -> {
                Mono.from(this.ioHandler.apply(this.inbound, this.outbound)).doOnTerminate(this::dispose).subscribe();
            }).doOnError(th -> {
                dispose();
            }).then(Mono.just(this));
        }

        public void dispose() {
            this.connector.dispose();
            AeronClient.this.handlers.remove(this);
            AeronClient.this.heartbeatWatchdog.remove(this.sessionId);
            if (this.inbound != null) {
                this.inbound.dispose();
            }
            this.outbound.dispose();
            if (this.sessionId > 0) {
                AeronClient.logger.debug("[{}] Closed session with Id: {}", new Object[]{AeronClient.this.name, Long.valueOf(this.sessionId)});
            }
        }
    }

    private AeronClient(String str, Consumer<AeronClientOptions> consumer) {
        AeronClientOptions aeronClientOptions = new AeronClientOptions();
        consumer.accept(aeronClientOptions);
        this.options = aeronClientOptions;
        this.name = str == null ? "client" : str;
        this.heartbeatWatchdog = new HeartbeatWatchdog(aeronClientOptions.heartbeatTimeoutMillis(), this.name);
        this.controlMessageSubscriber = new ClientControlMessageSubscriber(str, this.heartbeatWatchdog);
        this.clientControlStreamId = streamIdCounter.incrementAndGet();
        this.heartbeatSender = new HeartbeatSender(aeronClientOptions.heartbeatTimeoutMillis(), this.name);
        this.wrapper = new AeronWrapper(this.name, aeronClientOptions);
        this.controlSubscription = this.wrapper.addSubscription(aeronClientOptions.clientChannel(), this.clientControlStreamId, "to receive control requests on", 0L);
        Pooler pooler = new Pooler(this.name);
        pooler.addControlSubscription(this.controlSubscription, this.controlMessageSubscriber);
        pooler.initialise();
        this.pooler = pooler;
    }

    public static AeronClient create(String str, Consumer<AeronClientOptions> consumer) {
        return new AeronClient(str, consumer);
    }

    public static AeronClient create(String str) {
        return create(str, aeronClientOptions -> {
        });
    }

    public static AeronClient create() {
        return create(null);
    }

    @Override // reactor.ipc.aeron.AeronConnector
    public Mono<? extends Disposable> newHandler(BiFunction<AeronInbound, AeronOutbound, ? extends Publisher<Void>> biFunction) {
        return new ClientHandler(biFunction).initialise();
    }

    public void dispose() {
        this.handlers.forEach((v0) -> {
            v0.dispose();
        });
        this.pooler.removeSubscription(this.controlSubscription);
        this.pooler.shutdown().block();
        this.controlSubscription.close();
        this.wrapper.dispose();
    }
}
