package reactor.aeron.client;

import io.aeron.Subscription;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import reactor.aeron.AeronInbound;
import reactor.aeron.AeronOutbound;
import reactor.aeron.AeronResources;
import reactor.aeron.Connection;
import reactor.aeron.DefaultAeronOutbound;
import reactor.aeron.HeartbeatSender;
import reactor.aeron.HeartbeatWatchdog;
import reactor.core.Disposable;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoProcessor;
import reactor.util.Logger;
import reactor.util.Loggers;

/* loaded from: input_file:reactor/aeron/client/AeronClientConnector.class */
public final class AeronClientConnector implements Disposable {
    private final String name;
    private final AeronClientOptions options;
    private final AeronResources aeronResources;
    private final ClientControlMessageSubscriber controlMessageSubscriber;
    private final Subscription controlSubscription;
    private final HeartbeatSender heartbeatSender;
    private final HeartbeatWatchdog heartbeatWatchdog;
    private static final Logger logger = Loggers.getLogger(AeronClientConnector.class);
    private static final AtomicInteger streamIdCounter = new AtomicInteger();
    private final List<ClientHandler> handlers = new CopyOnWriteArrayList();
    private final int clientControlStreamId = streamIdCounter.incrementAndGet();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:reactor/aeron/client/AeronClientConnector$ClientHandler.class */
    public class ClientHandler implements Connection {
        private final DefaultAeronOutbound outbound;
        private final ClientConnector connector;
        private volatile AeronClientInbound inbound;
        private volatile long sessionId;
        private volatile int serverSessionStreamId;
        private final MonoProcessor<Void> onClose = MonoProcessor.create();
        private final int clientSessionStreamId = AeronClientConnector.streamIdCounter.incrementAndGet();

        ClientHandler() {
            this.outbound = new DefaultAeronOutbound(AeronClientConnector.this.name, AeronClientConnector.this.aeronResources, AeronClientConnector.this.options.serverChannel(), AeronClientConnector.this.options);
            this.connector = new ClientConnector(AeronClientConnector.this.name, AeronClientConnector.this.aeronResources, AeronClientConnector.this.options, AeronClientConnector.this.controlMessageSubscriber, AeronClientConnector.this.heartbeatSender, AeronClientConnector.this.clientControlStreamId, this.clientSessionStreamId);
            this.onClose.doOnTerminate(this::dispose0).subscribe((Consumer) null, th -> {
                AeronClientConnector.logger.warn("SessionHandler disposed with error: {}", th);
            });
        }

        Mono<Connection> initialise() {
            AeronClientConnector.this.handlers.add(this);
            return this.connector.connect().flatMap(connectAckResponse -> {
                this.sessionId = connectAckResponse.sessionId;
                this.serverSessionStreamId = connectAckResponse.serverSessionStreamId;
                this.inbound = new AeronClientInbound(AeronClientConnector.this.name, AeronClientConnector.this.aeronResources, AeronClientConnector.this.options.clientChannel(), this.clientSessionStreamId, this.sessionId, this::dispose);
                AeronClientConnector.this.heartbeatWatchdog.add(this.sessionId, this::dispose, () -> {
                    return this.inbound.getLastSignalTimeNs();
                });
                return this.outbound.initialise(this.sessionId, this.serverSessionStreamId);
            }).doOnError(th -> {
                AeronClientConnector.logger.error("[{}] Occurred exception for sessionId: {} clientSessionStreamId: {}, error: ", new Object[]{AeronClientConnector.this.name, Long.valueOf(this.sessionId), Integer.valueOf(this.clientSessionStreamId), th});
                dispose();
            }).then(Mono.just(this));
        }

        @Override // reactor.aeron.Connection
        public AeronInbound inbound() {
            return this.inbound;
        }

        @Override // reactor.aeron.Connection
        public AeronOutbound outbound() {
            return this.outbound;
        }

        @Override // reactor.aeron.OnDisposable
        public Mono<Void> onDispose() {
            return this.onClose;
        }

        public void dispose() {
            if (this.onClose.isDisposed()) {
                return;
            }
            this.onClose.onComplete();
        }

        public boolean isDisposed() {
            return this.onClose.isDisposed();
        }

        public String toString() {
            StringBuilder sb = new StringBuilder("ClientSession{");
            sb.append("sessionId=").append(this.sessionId);
            sb.append(", clientChannel=").append(AeronClientConnector.this.options.clientChannel());
            sb.append(", serverChannel=").append(AeronClientConnector.this.options.serverChannel());
            sb.append(", clientSessionStreamId=").append(this.clientSessionStreamId);
            sb.append(", serverSessionStreamId=").append(this.serverSessionStreamId);
            sb.append('}');
            return sb.toString();
        }

        public void dispose0() {
            AeronClientConnector.this.handlers.remove(this);
            AeronClientConnector.this.heartbeatWatchdog.remove(this.sessionId);
            this.connector.dispose();
            if (this.inbound != null) {
                this.inbound.dispose();
            }
            this.outbound.dispose();
            AeronClientConnector.logger.debug("[{}] Closed session with Id: {}", new Object[]{AeronClientConnector.this.name, Long.valueOf(this.sessionId)});
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public AeronClientConnector(AeronClientSettings aeronClientSettings) {
        this.options = aeronClientSettings.options();
        this.name = (String) Optional.ofNullable(aeronClientSettings.name()).orElse("client");
        this.aeronResources = aeronClientSettings.aeronResources();
        this.heartbeatWatchdog = new HeartbeatWatchdog(this.options.heartbeatTimeoutMillis(), this.name);
        this.controlMessageSubscriber = new ClientControlMessageSubscriber(this.name, this.heartbeatWatchdog, (v1) -> {
            dispose(v1);
        });
        this.heartbeatSender = new HeartbeatSender(this.options.heartbeatTimeoutMillis(), this.name);
        this.controlSubscription = this.aeronResources.controlSubscription(this.name, this.options.clientChannel(), this.clientControlStreamId, "to receive control requests on", 0L, this.controlMessageSubscriber);
    }

    public Mono<Connection> newHandler() {
        return new ClientHandler().initialise();
    }

    public void dispose() {
        this.handlers.forEach((v0) -> {
            v0.dispose();
        });
        this.handlers.clear();
        this.aeronResources.close(this.controlSubscription);
    }

    private void dispose(long j) {
        this.handlers.stream().filter(clientHandler -> {
            return clientHandler.sessionId == j;
        }).findFirst().ifPresent((v0) -> {
            v0.dispose();
        });
    }
}
