package reactor.aeron.server;

import io.aeron.Subscription;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import reactor.aeron.AeronInbound;
import reactor.aeron.AeronOptions;
import reactor.aeron.AeronOutbound;
import reactor.aeron.AeronResources;
import reactor.aeron.AeronUtils;
import reactor.aeron.Connection;
import reactor.aeron.ControlMessageSubscriber;
import reactor.aeron.DefaultAeronOutbound;
import reactor.aeron.HeartbeatSender;
import reactor.aeron.HeartbeatWatchdog;
import reactor.aeron.MessageType;
import reactor.aeron.OnDisposable;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoProcessor;
import reactor.core.scheduler.Schedulers;
import reactor.util.Logger;
import reactor.util.Loggers;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:reactor/aeron/server/ServerHandler.class */
public final class ServerHandler implements ControlMessageSubscriber, OnDisposable {
    private static final Logger logger = Loggers.getLogger(ServerHandler.class);
    private static final AtomicInteger streamIdCounter = new AtomicInteger(1000);
    private final AeronServerSettings settings;
    private final String category;
    private final AeronOptions options;
    private final AeronResources aeronResources;
    private final HeartbeatWatchdog heartbeatWatchdog;
    private final HeartbeatSender heartbeatSender;
    private final Subscription controlSubscription;
    private final AtomicLong nextSessionId = new AtomicLong(0);
    private final List<SessionHandler> handlers = new CopyOnWriteArrayList();
    private final MonoProcessor<Void> onClose = MonoProcessor.create();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:reactor/aeron/server/ServerHandler$SessionHandler.class */
    public class SessionHandler implements Connection {
        private final DefaultAeronOutbound outbound;
        private final AeronServerInbound inbound;
        private final String clientChannel;
        private final int clientSessionStreamId;
        private final int serverSessionStreamId;
        private final UUID connectRequestId;
        private final long sessionId;
        private final ServerConnector connector;
        private final Logger logger = Loggers.getLogger(SessionHandler.class);
        private final MonoProcessor<Void> onClose = MonoProcessor.create();

        SessionHandler(String str, int i, int i2, UUID uuid, long j, int i3) {
            this.clientSessionStreamId = i;
            this.clientChannel = str;
            this.outbound = new DefaultAeronOutbound(ServerHandler.this.category, ServerHandler.this.aeronResources, str, ServerHandler.this.options);
            this.connectRequestId = uuid;
            this.sessionId = j;
            this.serverSessionStreamId = i3;
            this.inbound = new AeronServerInbound(ServerHandler.this.category, ServerHandler.this.aeronResources);
            this.connector = new ServerConnector(ServerHandler.this.category, ServerHandler.this.aeronResources, str, i2, j, i3, uuid, ServerHandler.this.options, ServerHandler.this.heartbeatSender);
            this.onClose.doOnTerminate(this::dispose0).subscribe((Consumer) null, th -> {
                this.logger.warn("SessionHandler disposed with error: {}", th);
            });
        }

        Mono<? extends Connection> initialise() {
            return this.connector.connect().then(this.outbound.initialise(this.sessionId, this.clientSessionStreamId)).then(this.inbound.initialise(ServerHandler.this.category, ServerHandler.this.options.serverChannel(), this.serverSessionStreamId, this.sessionId, this::dispose)).thenReturn(this).doOnSuccess(sessionHandler -> {
                HeartbeatWatchdog heartbeatWatchdog = ServerHandler.this.heartbeatWatchdog;
                long j = this.sessionId;
                Runnable runnable = () -> {
                    ServerHandler.this.heartbeatWatchdog.remove(this.sessionId);
                    dispose();
                };
                AeronServerInbound aeronServerInbound = this.inbound;
                aeronServerInbound.getClass();
                heartbeatWatchdog.add(j, runnable, aeronServerInbound::lastSignalTimeNs);
                ServerHandler.this.handlers.add(this);
                this.logger.debug("[{}] Client with connectRequestId: {} successfully connected, sessionId: {}", new Object[]{ServerHandler.this.category, this.connectRequestId, Long.valueOf(this.sessionId)});
            }).doOnError(th -> {
                this.logger.debug("[{}] Failed to connect to the client for sessionId: {}", new Object[]{ServerHandler.this.category, Long.valueOf(this.sessionId), th});
                dispose();
            });
        }

        @Override // reactor.aeron.Connection
        public AeronInbound inbound() {
            return this.inbound;
        }

        @Override // reactor.aeron.Connection
        public AeronOutbound outbound() {
            return this.outbound;
        }

        @Override // reactor.aeron.OnDisposable
        public Mono<Void> onDispose() {
            return this.onClose;
        }

        public void dispose() {
            if (this.onClose.isDisposed()) {
                return;
            }
            this.onClose.onComplete();
        }

        public boolean isDisposed() {
            return this.onClose.isDisposed();
        }

        public String toString() {
            StringBuilder sb = new StringBuilder("ServerSession{");
            sb.append("sessionId=").append(this.sessionId);
            sb.append(", clientChannel=").append(this.clientChannel);
            sb.append(", clientSessionStreamId=").append(this.clientSessionStreamId);
            sb.append(", serverSessionStreamId=").append(this.serverSessionStreamId);
            sb.append(", connectRequestId=").append(this.connectRequestId);
            sb.append('}');
            return sb.toString();
        }

        private void dispose0() {
            ServerHandler.this.handlers.remove(this);
            ServerHandler.this.heartbeatWatchdog.remove(this.sessionId);
            this.connector.dispose();
            this.outbound.dispose();
            this.inbound.dispose();
            this.logger.debug("[{}] Closed session with sessionId: {}", new Object[]{ServerHandler.this.category, Long.valueOf(this.sessionId)});
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public ServerHandler(AeronServerSettings aeronServerSettings) {
        this.settings = aeronServerSettings;
        this.category = aeronServerSettings.name();
        this.options = aeronServerSettings.options();
        this.aeronResources = aeronServerSettings.aeronResources();
        this.controlSubscription = aeronServerSettings.aeronResources().controlSubscription(this.category, this.options.serverChannel(), this.options.serverStreamId(), "to receive control requests on", 0L, this);
        this.heartbeatWatchdog = new HeartbeatWatchdog(this.options.heartbeatTimeoutMillis(), this.category);
        this.heartbeatSender = new HeartbeatSender(this.options.heartbeatTimeoutMillis(), this.category);
        this.onClose.doOnTerminate(this::dispose0).subscribe((Consumer) null, th -> {
            logger.warn("ServerHandler disposed with error: {}", th);
        });
    }

    @Override // reactor.aeron.PollerSubscriber
    public void onSubscribe(org.reactivestreams.Subscription subscription) {
        subscription.request(Long.MAX_VALUE);
    }

    @Override // reactor.aeron.ControlMessageSubscriber
    public void onConnect(UUID uuid, String str, int i, int i2) {
        if (logger.isDebugEnabled()) {
            logger.debug("[{}] Received {} for connectRequestId: {}, channel={}, clientControlStreamId={}, clientSessionStreamId={}", new Object[]{this.category, MessageType.CONNECT, uuid, AeronUtils.minifyChannel(str), Integer.valueOf(i), Integer.valueOf(i2)});
        }
        int incrementAndGet = streamIdCounter.incrementAndGet();
        long incrementAndGet2 = this.nextSessionId.incrementAndGet();
        new SessionHandler(str, i2, i, uuid, incrementAndGet2, incrementAndGet).initialise().subscribeOn(Schedulers.single()).subscribe(connection -> {
            this.settings.handler().apply(connection).subscribe(connection.disposeSubscriber());
        }, th -> {
            logger.error("[{}] Occurred exception on connect to {}, sessionId: {}, connectRequestId: {}, clientSessionStreamId: {}, clientControlStreamId: {}, serverSessionStreamId: {}, error: ", new Object[]{this.category, str, Long.valueOf(incrementAndGet2), uuid, Integer.valueOf(i2), Integer.valueOf(i), Integer.valueOf(incrementAndGet), th});
        });
    }

    @Override // reactor.aeron.ControlMessageSubscriber
    public void onConnectAck(UUID uuid, long j, int i) {
        logger.error("[{}] Received unsupported server request {}, connectRequestId: {}", new Object[]{this.category, MessageType.CONNECT_ACK, uuid});
    }

    @Override // reactor.aeron.ControlMessageSubscriber
    public void onHeartbeat(long j) {
        this.heartbeatWatchdog.heartbeatReceived(j);
    }

    @Override // reactor.aeron.ControlMessageSubscriber
    public void onComplete(long j) {
        logger.info("[{}] Received {} for sessionId: {}", new Object[]{this.category, MessageType.COMPLETE, Long.valueOf(j)});
        this.handlers.stream().filter(sessionHandler -> {
            return sessionHandler.sessionId == j;
        }).findFirst().ifPresent((v0) -> {
            v0.dispose();
        });
    }

    public void dispose() {
        if (isDisposed()) {
            return;
        }
        this.onClose.onComplete();
    }

    public boolean isDisposed() {
        return this.onClose.isDisposed();
    }

    @Override // reactor.aeron.OnDisposable
    public Mono<Void> onDispose() {
        return this.onClose;
    }

    private void dispose0() {
        this.handlers.forEach((v0) -> {
            v0.dispose();
        });
        this.handlers.clear();
        this.aeronResources.close(this.controlSubscription);
    }
}
