package reactor.aeron.server;

import io.aeron.Publication;
import java.util.UUID;
import java.util.concurrent.Callable;
import reactor.aeron.AeronOptions;
import reactor.aeron.AeronResources;
import reactor.aeron.AeronUtils;
import reactor.aeron.DefaultMessagePublication;
import reactor.aeron.HeartbeatSender;
import reactor.aeron.MessagePublication;
import reactor.aeron.MessageType;
import reactor.aeron.Protocol;
import reactor.aeron.RetryTask;
import reactor.core.Disposable;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoSink;
import reactor.core.scheduler.Schedulers;
import reactor.util.Logger;
import reactor.util.Loggers;

/* loaded from: input_file:reactor/aeron/server/ServerConnector.class */
public class ServerConnector implements Disposable {
    private static final Logger logger = Loggers.getLogger(ServerConnector.class);
    private final String category;
    private final Publication clientControlPublication;
    private final int serverSessionStreamId;
    private final UUID connectRequestId;
    private final AeronOptions options;
    private final long sessionId;
    private final HeartbeatSender heartbeatSender;
    private final AeronResources aeronResources;
    private volatile Disposable heartbeatSenderDisposable = () -> {
    };

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:reactor/aeron/server/ServerConnector$SendConnectAckTask.class */
    public class SendConnectAckTask implements Callable<Boolean> {
        private final MessagePublication publication;
        private final MonoSink<?> sink;

        SendConnectAckTask(MonoSink<?> monoSink) {
            this.sink = monoSink;
            this.publication = new DefaultMessagePublication(ServerConnector.this.clientControlPublication, ServerConnector.this.category, 0L, 0L);
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // java.util.concurrent.Callable
        public Boolean call() throws Exception {
            long publish = this.publication.publish(MessageType.CONNECT_ACK, Protocol.createConnectAckBody(ServerConnector.this.connectRequestId, ServerConnector.this.serverSessionStreamId), ServerConnector.this.sessionId);
            if (publish > 0) {
                ServerConnector.logger.debug("[{}] Sent {} to {}", new Object[]{ServerConnector.this.category, MessageType.CONNECT_ACK, ServerConnector.this.category, this.publication.asString()});
                this.sink.success();
                return true;
            }
            if (publish == -4) {
                throw new RuntimeException(String.format("Publication %s has been closed", this.publication.asString()));
            }
            return false;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public ServerConnector(String str, AeronResources aeronResources, String str2, int i, long j, int i2, UUID uuid, AeronOptions aeronOptions, HeartbeatSender heartbeatSender) {
        this.category = str;
        this.serverSessionStreamId = i2;
        this.connectRequestId = uuid;
        this.options = aeronOptions;
        this.sessionId = j;
        this.heartbeatSender = heartbeatSender;
        this.aeronResources = aeronResources;
        this.clientControlPublication = aeronResources.publication(str, str2, i, "to send control requests to client", j);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Mono<Void> connect() {
        return Mono.create(monoSink -> {
            createConnectRetryTask(monoSink).schedule();
        }).then(Mono.fromRunnable(() -> {
            this.heartbeatSenderDisposable = scheduleHearbeats();
        }));
    }

    private Disposable scheduleHearbeats() {
        return this.heartbeatSender.scheduleHeartbeats(this.clientControlPublication, this.sessionId).subscribe(r1 -> {
        }, th -> {
        });
    }

    private RetryTask createConnectRetryTask(MonoSink<Object> monoSink) {
        return new RetryTask(Schedulers.single(), 100L, this.options.connectTimeoutMillis() + this.options.controlBackpressureTimeoutMillis(), new SendConnectAckTask(monoSink), th -> {
            monoSink.error(new RuntimeException(String.format("Failed to send %s into %s", MessageType.CONNECT_ACK, AeronUtils.format(this.clientControlPublication)), th));
        });
    }

    public void dispose() {
        this.heartbeatSenderDisposable.dispose();
        this.aeronResources.close(this.clientControlPublication);
    }
}
