package reactor.ipc.aeron.client;

import io.aeron.Publication;
import java.nio.ByteBuffer;
import java.util.UUID;
import java.util.concurrent.TimeoutException;
import reactor.core.Disposable;
import reactor.core.publisher.Mono;
import reactor.ipc.aeron.AeronUtils;
import reactor.ipc.aeron.AeronWrapper;
import reactor.ipc.aeron.DefaultMessagePublication;
import reactor.ipc.aeron.HeartbeatSender;
import reactor.ipc.aeron.MessageType;
import reactor.ipc.aeron.Protocol;
import reactor.ipc.aeron.client.ClientControlMessageSubscriber;
import reactor.util.Logger;
import reactor.util.Loggers;

/* loaded from: input_file:reactor/ipc/aeron/client/ClientConnector.class */
final class ClientConnector implements Disposable {
    private static final Logger logger = Loggers.getLogger(ClientConnector.class);
    private final String category;
    private final AeronClientOptions options;
    private final ClientControlMessageSubscriber controlMessageSubscriber;
    private final int clientControlStreamId;
    private final int clientSessionStreamId;
    private final Publication serverControlPublication;
    private final HeartbeatSender heartbeatSender;
    private volatile long sessionId;
    private volatile Disposable heartbeatSenderDisposable = () -> {
    };
    private final UUID connectRequestId = UuidUtils.create();

    /* JADX INFO: Access modifiers changed from: package-private */
    public ClientConnector(String str, AeronWrapper aeronWrapper, AeronClientOptions aeronClientOptions, ClientControlMessageSubscriber clientControlMessageSubscriber, HeartbeatSender heartbeatSender, int i, int i2) {
        this.category = str;
        this.options = aeronClientOptions;
        this.controlMessageSubscriber = clientControlMessageSubscriber;
        this.clientControlStreamId = i;
        this.clientSessionStreamId = i2;
        this.heartbeatSender = heartbeatSender;
        this.serverControlPublication = aeronWrapper.addPublication(aeronClientOptions.serverChannel(), aeronClientOptions.serverStreamId(), "to send control requests to server", 0L);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Mono<ClientControlMessageSubscriber.ConnectAckResponse> connect() {
        ClientControlMessageSubscriber.ConnectAckSubscription subscribeForConnectAck = this.controlMessageSubscriber.subscribeForConnectAck(this.connectRequestId);
        Mono doOnSuccess = sendConnectRequest().then(subscribeForConnectAck.connectAck().timeout(this.options.ackTimeout()).onErrorMap(TimeoutException.class, timeoutException -> {
            throw new RuntimeException(String.format("Failed to receive %s during %d millis", MessageType.CONNECT_ACK, Long.valueOf(this.options.ackTimeout().toMillis())), timeoutException);
        })).doOnSuccess(connectAckResponse -> {
            this.sessionId = connectAckResponse.sessionId;
            this.heartbeatSenderDisposable = this.heartbeatSender.scheduleHeartbeats(this.serverControlPublication, this.sessionId).subscribe(r1 -> {
            }, th -> {
            });
            if (logger.isDebugEnabled()) {
                logger.debug("[{}] Successfully connected to server at {}, sessionId: {}", new Object[]{this.category, AeronUtils.format(this.serverControlPublication), Long.valueOf(this.sessionId)});
            }
        });
        subscribeForConnectAck.getClass();
        return doOnSuccess.doOnTerminate(subscribeForConnectAck::dispose).onErrorMap(th -> {
            throw new RuntimeException(String.format("Failed to connect to server at %s", AeronUtils.format(this.serverControlPublication)));
        });
    }

    private Mono<Void> sendConnectRequest() {
        return Mono.fromRunnable(this::logConnect).then(send(Protocol.createConnectBody(this.connectRequestId, this.options.clientChannel(), this.clientControlStreamId, this.clientSessionStreamId), MessageType.CONNECT));
    }

    private void logConnect() {
        if (logger.isDebugEnabled()) {
            logger.debug("[{}] Connecting to server at {}", new Object[]{this.category, AeronUtils.format(this.serverControlPublication)});
        }
    }

    private Mono<Void> sendDisconnectRequest() {
        return Mono.fromRunnable(this::logDisconnect).then(send(Protocol.createDisconnectBody(this.sessionId), MessageType.COMPLETE));
    }

    private void logDisconnect() {
        if (logger.isDebugEnabled()) {
            logger.debug("[{}] Disconnecting from server at {}", new Object[]{this.category, AeronUtils.format(this.serverControlPublication)});
        }
    }

    private Mono<Void> send(ByteBuffer byteBuffer, MessageType messageType) {
        return Mono.create(monoSink -> {
            Exception exc = null;
            try {
                DefaultMessagePublication defaultMessagePublication = new DefaultMessagePublication(this.serverControlPublication, this.category, this.options.connectTimeoutMillis(), this.options.controlBackpressureTimeoutMillis());
                if (defaultMessagePublication.publish(messageType, byteBuffer, this.sessionId) > 0) {
                    logger.debug("[{}] Sent {} to {}", new Object[]{this.category, messageType, defaultMessagePublication.asString()});
                    monoSink.success();
                    return;
                }
            } catch (Exception e) {
                exc = e;
            }
            monoSink.error(new RuntimeException("Failed to send message of type: " + messageType, exc));
        });
    }

    public void dispose() {
        sendDisconnectRequest().subscribe(r1 -> {
        }, th -> {
        });
        this.heartbeatSenderDisposable.dispose();
        this.serverControlPublication.close();
    }
}
