package reactor.aeron.client;

import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscription;
import reactor.aeron.AeronInbound;
import reactor.aeron.AeronOptions;
import reactor.aeron.AeronOutbound;
import reactor.aeron.AeronResources;
import reactor.aeron.AeronUtils;
import reactor.aeron.Connection;
import reactor.aeron.ControlMessageSubscriber;
import reactor.aeron.DefaultAeronInbound;
import reactor.aeron.DefaultAeronOutbound;
import reactor.aeron.MessagePublication;
import reactor.aeron.MessageType;
import reactor.aeron.OnDisposable;
import reactor.aeron.Protocol;
import reactor.core.Disposable;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoProcessor;
import reactor.util.Logger;
import reactor.util.Loggers;

/* loaded from: input_file:reactor/aeron/client/AeronClientConnector.class */
public final class AeronClientConnector implements ControlMessageSubscriber, OnDisposable {
    private static final int CONTROL_STREAM_ID = 1;
    private final String category;
    private final AeronOptions options;
    private final AeronResources resources;
    private final int clientControlStreamId;
    private final String clientChannel;
    private final Supplier<Integer> clientSessionStreamIdCounter;
    private final Map<Long, ConnectAckPromise> connectAckPromises = new ConcurrentHashMap();
    private final List<ClientHandler> handlers = new CopyOnWriteArrayList();
    private final MonoProcessor<Void> dispose = MonoProcessor.create();
    private final MonoProcessor<Void> onDispose = MonoProcessor.create();
    private static final Logger logger = Loggers.getLogger(AeronClientConnector.class);
    private static final AtomicLong connectRequestIdCounter = new AtomicLong(System.currentTimeMillis());

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:reactor/aeron/client/AeronClientConnector$ClientHandler.class */
    public class ClientHandler implements Connection {
        private final DefaultAeronOutbound outbound;
        private final int clientSessionStreamId;
        private final long connectRequestId;
        private final String serverChannel;
        private final Mono<MessagePublication> controlPublication;
        private volatile long sessionId;
        private volatile int serverSessionStreamId;
        private volatile DefaultAeronInbound inbound;
        private final MonoProcessor<Void> dispose;
        private final MonoProcessor<Void> onDispose;

        private ClientHandler() {
            this.connectRequestId = AeronClientConnector.connectRequestIdCounter.incrementAndGet();
            this.dispose = MonoProcessor.create();
            this.onDispose = MonoProcessor.create();
            this.clientSessionStreamId = ((Integer) AeronClientConnector.this.clientSessionStreamIdCounter.get()).intValue();
            this.serverChannel = AeronClientConnector.this.options.serverChannel();
            this.inbound = new DefaultAeronInbound(AeronClientConnector.this.category, AeronClientConnector.this.resources);
            this.outbound = new DefaultAeronOutbound(AeronClientConnector.this.category, this.serverChannel, AeronClientConnector.this.resources, AeronClientConnector.this.options);
            this.controlPublication = Mono.defer(this::newControlPublication).cache();
            this.dispose.then(doDispose()).doFinally(signalType -> {
                this.onDispose.onComplete();
            }).subscribe((Consumer) null, th -> {
                AeronClientConnector.logger.warn("ClientHandler disposed with error: " + th);
            });
        }

        private Mono<MessagePublication> newControlPublication() {
            return AeronClientConnector.this.resources.messagePublication(AeronClientConnector.this.category, this.serverChannel, AeronClientConnector.CONTROL_STREAM_ID, AeronClientConnector.this.options, AeronClientConnector.this.resources.nextEventLoop());
        }

        /* JADX INFO: Access modifiers changed from: private */
        public Mono<? extends Connection> start() {
            AeronClientConnector.this.handlers.add(this);
            return connect().flatMap(connectAckResponse -> {
                this.sessionId = connectAckResponse.sessionId;
                this.serverSessionStreamId = connectAckResponse.serverSessionStreamId;
                return this.inbound.start(AeronClientConnector.this.clientChannel, this.clientSessionStreamId, this.sessionId, this::dispose).then(this.outbound.start(this.sessionId, this.serverSessionStreamId)).thenReturn(this);
            }).doOnError(th -> {
                AeronClientConnector.logger.error("[{}] Occurred exception for sessionId: {} clientSessionStreamId: {}, error: ", new Object[]{AeronClientConnector.this.category, Long.valueOf(this.sessionId), Integer.valueOf(this.clientSessionStreamId), th});
                dispose();
            }).thenReturn(this);
        }

        private Mono<ConnectAckResponse> connect() {
            ConnectAckPromise connectAckPromise = (ConnectAckPromise) AeronClientConnector.this.connectAckPromises.computeIfAbsent(Long.valueOf(this.connectRequestId), j -> {
                return new ConnectAckPromise(j);
            });
            Mono doOnSuccess = sendConnectRequest().then(connectAckPromise.promise().timeout(AeronClientConnector.this.options.ackTimeout()).doOnError(th -> {
                AeronClientConnector.logger.warn("Failed to receive {} during {} millis", new Object[]{MessageType.CONNECT_ACK, Long.valueOf(AeronClientConnector.this.options.ackTimeout().toMillis())});
            })).doOnSuccess(connectAckResponse -> {
                AeronClientConnector.logger.debug("[{}] Successfully connected to server at {}, sessionId: {}", new Object[]{AeronClientConnector.this.category, AeronUtils.minifyChannel(this.serverChannel), Long.valueOf(connectAckResponse.sessionId)});
            });
            connectAckPromise.getClass();
            return doOnSuccess.doOnTerminate(connectAckPromise::dispose).doOnError(th2 -> {
                AeronClientConnector.logger.warn("Failed to connect to server at {}, cause: {}", new Object[]{AeronUtils.minifyChannel(this.serverChannel), th2.toString()});
            });
        }

        private Mono<Void> sendConnectRequest() {
            return this.controlPublication.flatMap(messagePublication -> {
                AeronClientConnector.logger.debug("[{}] Connecting to server at {}", new Object[]{AeronClientConnector.this.category, AeronUtils.minifyChannel(this.serverChannel)});
                return send(messagePublication, Protocol.createConnectBody(this.connectRequestId, AeronClientConnector.this.clientChannel, AeronClientConnector.this.clientControlStreamId, this.clientSessionStreamId), MessageType.CONNECT);
            });
        }

        private Mono<Void> sendDisconnectRequest() {
            return this.controlPublication.flatMap(messagePublication -> {
                AeronClientConnector.logger.debug("[{}] Disconnecting from server at {}", new Object[]{AeronClientConnector.this.category, AeronUtils.minifyChannel(this.serverChannel)});
                return send(messagePublication, Protocol.createDisconnectBody(this.sessionId), MessageType.DISCONNECT);
            });
        }

        private Mono<Void> send(MessagePublication messagePublication, ByteBuffer byteBuffer, MessageType messageType) {
            return messagePublication.enqueue(byteBuffer).doOnSuccess(r9 -> {
                AeronClientConnector.logger.debug("[{}] Sent {} to {}", new Object[]{AeronClientConnector.this.category, messageType, AeronUtils.minifyChannel(this.serverChannel)});
            }).doOnError(th -> {
                AeronClientConnector.logger.warn("[{}] Failed to send {} to {}, cause: {}", new Object[]{AeronClientConnector.this.category, messageType, AeronUtils.minifyChannel(this.serverChannel), th.toString()});
            });
        }

        @Override // reactor.aeron.Connection
        public AeronInbound inbound() {
            return this.inbound;
        }

        @Override // reactor.aeron.Connection
        public AeronOutbound outbound() {
            return this.outbound;
        }

        public String toString() {
            StringBuilder sb = new StringBuilder("ClientSession{");
            sb.append("sessionId=").append(this.sessionId);
            sb.append(", clientChannel=").append(AeronClientConnector.this.clientChannel);
            sb.append(", serverChannel=").append(this.serverChannel);
            sb.append(", clientSessionStreamId=").append(this.clientSessionStreamId);
            sb.append(", serverSessionStreamId=").append(this.serverSessionStreamId);
            sb.append('}');
            return sb.toString();
        }

        public void dispose() {
            this.dispose.onComplete();
        }

        public boolean isDisposed() {
            return this.onDispose.isDisposed();
        }

        @Override // reactor.aeron.OnDisposable
        public Mono<Void> onDispose() {
            return this.onDispose;
        }

        private Mono<Void> doDispose() {
            return Mono.defer(() -> {
                AeronClientConnector.logger.debug("[{}] About to close session with sessionId: {}", new Object[]{AeronClientConnector.this.category, Long.valueOf(this.sessionId)});
                AeronClientConnector.this.handlers.remove(this);
                return sendDisconnectRequest().onErrorResume(th -> {
                    return Mono.empty();
                }).then(Mono.defer(() -> {
                    Optional.ofNullable(this.outbound).ifPresent((v0) -> {
                        v0.dispose();
                    });
                    Optional.ofNullable(this.inbound).ifPresent((v0) -> {
                        v0.dispose();
                    });
                    return Mono.whenDelayError(new Publisher[]{(Publisher) Optional.ofNullable(this.outbound).map((v0) -> {
                        return v0.onDispose();
                    }).orElse(Mono.empty()), (Publisher) Optional.ofNullable(this.inbound).map((v0) -> {
                        return v0.onDispose();
                    }).orElse(Mono.empty())}).doFinally(signalType -> {
                        AeronClientConnector.logger.debug("[{}] Closed session with sessionId: {}", new Object[]{AeronClientConnector.this.category, Long.valueOf(this.sessionId)});
                    });
                }));
            });
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:reactor/aeron/client/AeronClientConnector$ConnectAckPromise.class */
    public class ConnectAckPromise implements Disposable {
        private final long connectRequestId;
        private final MonoProcessor<ConnectAckResponse> promise;

        private ConnectAckPromise(long j) {
            this.connectRequestId = j;
            this.promise = MonoProcessor.create();
        }

        /* JADX INFO: Access modifiers changed from: private */
        public Mono<ConnectAckResponse> promise() {
            return this.promise;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void success(long j, int i) {
            this.promise.onNext(new ConnectAckResponse(j, i));
            this.promise.onComplete();
        }

        public void dispose() {
            AeronClientConnector.this.connectAckPromises.remove(Long.valueOf(this.connectRequestId));
            this.promise.cancel();
        }

        public boolean isDisposed() {
            return this.promise.isDisposed();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:reactor/aeron/client/AeronClientConnector$ConnectAckResponse.class */
    public static class ConnectAckResponse {
        private final long sessionId;
        private final int serverSessionStreamId;

        private ConnectAckResponse(long j, int i) {
            this.sessionId = j;
            this.serverSessionStreamId = i;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public AeronClientConnector(AeronClientSettings aeronClientSettings, int i, Supplier<Integer> supplier) {
        this.options = aeronClientSettings.options();
        this.category = (String) Optional.ofNullable(aeronClientSettings.name()).orElse("client");
        this.resources = aeronClientSettings.aeronResources();
        this.clientChannel = this.options.clientChannel();
        this.clientControlStreamId = i;
        this.clientSessionStreamIdCounter = supplier;
        this.dispose.then(doDispose()).doFinally(signalType -> {
            this.onDispose.onComplete();
        }).subscribe((Consumer) null, th -> {
            logger.warn("AeronClientConnector disposed with error: " + th);
        });
    }

    public Mono<Connection> start() {
        return Mono.defer(() -> {
            return new ClientHandler().start();
        });
    }

    private void dispose(long j) {
        this.handlers.stream().filter(clientHandler -> {
            return clientHandler.sessionId == j;
        }).findFirst().ifPresent((v0) -> {
            v0.dispose();
        });
    }

    public void dispose() {
        this.dispose.onComplete();
    }

    public boolean isDisposed() {
        return this.onDispose.isDisposed();
    }

    @Override // reactor.aeron.OnDisposable
    public Mono<Void> onDispose() {
        return this.onDispose;
    }

    private Mono<Void> doDispose() {
        return Mono.defer(() -> {
            return Mono.whenDelayError((Iterable) this.handlers.stream().map(clientHandler -> {
                clientHandler.dispose();
                return clientHandler.onDispose();
            }).collect(Collectors.toList()));
        });
    }

    @Override // reactor.aeron.ControlMessageSubscriber
    public void onSubscription(Subscription subscription) {
        subscription.request(Long.MAX_VALUE);
    }

    @Override // reactor.aeron.ControlMessageSubscriber
    public void onConnectAck(long j, long j2, int i) {
        logger.debug("[{}] Received {} for connectRequestId: {}, serverSessionStreamId: {}", new Object[]{this.category, MessageType.CONNECT_ACK, Long.valueOf(j), Integer.valueOf(i)});
        ConnectAckPromise remove = this.connectAckPromises.remove(Long.valueOf(j));
        if (remove != null) {
            remove.success(j2, i);
        }
    }

    @Override // reactor.aeron.ControlMessageSubscriber
    public void onDisconnect(long j) {
        logger.info("[{}] Received {} for sessionId: {}", new Object[]{this.category, MessageType.DISCONNECT, Long.valueOf(j)});
        dispose(j);
    }

    @Override // reactor.aeron.ControlMessageSubscriber
    public void onConnect(long j, String str, int i, int i2) {
        logger.error("[{}] Unsupported {} request for a client, clientChannel: {}, clientControlStreamId: {}, clientSessionStreamId: {}", new Object[]{this.category, MessageType.CONNECT, str, Integer.valueOf(i), Integer.valueOf(i2)});
    }
}
