package reactor.aeron;

import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.Objects;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Mono;
import reactor.util.Logger;
import reactor.util.Loggers;

/* loaded from: input_file:reactor/aeron/DefaultAeronOutbound.class */
public final class DefaultAeronOutbound implements AeronOutbound, OnDisposable {
    private static final Logger logger = Loggers.getLogger(DefaultAeronOutbound.class);
    private static final RuntimeException NOT_CONNECTED_EXCEPTION = new RuntimeException("publication is not connected");
    private final String category;
    private final String channel;
    private final AeronResources resources;
    private final AeronOptions options;
    private volatile AeronWriteSequencer sequencer;
    private volatile MessagePublication publication;

    public DefaultAeronOutbound(String str, String str2, AeronResources aeronResources, AeronOptions aeronOptions) {
        this.category = str;
        this.channel = str2;
        this.resources = aeronResources;
        this.options = aeronOptions;
    }

    public Mono<Void> start(long j, int i) {
        return Mono.defer(() -> {
            return this.resources.messagePublication(this.category, this.channel, i, this.options, this.resources.nextEventLoop()).doOnSuccess(messagePublication -> {
                this.publication = messagePublication;
                this.sequencer = new AeronWriteSequencer(j, this.publication);
            }).flatMap(messagePublication2 -> {
                Duration ofMillis = Duration.ofMillis(100L);
                Duration connectTimeout = this.options.connectTimeout();
                long millis = connectTimeout.toMillis() / ofMillis.toMillis();
                MessagePublication messagePublication2 = this.publication;
                messagePublication2.getClass();
                return Mono.fromCallable(messagePublication2::isConnected).filter(bool -> {
                    return bool.booleanValue();
                }).switchIfEmpty(Mono.error(NOT_CONNECTED_EXCEPTION)).retryBackoff(millis, ofMillis, ofMillis).timeout(connectTimeout).then().onErrorResume(th -> {
                    logger.warn("Failed to connect publication {} for sending data during {}", new Object[]{this.publication, connectTimeout});
                    dispose();
                    return Mono.error(th);
                });
            });
        });
    }

    @Override // reactor.aeron.AeronOutbound
    public AeronOutbound send(Publisher<? extends ByteBuffer> publisher) {
        Objects.requireNonNull(this.sequencer, "sequencer must be initialized");
        return then(this.sequencer.write(publisher));
    }

    @Override // reactor.aeron.AeronOutbound
    public Mono<Void> then() {
        return Mono.empty();
    }

    public void dispose() {
        if (this.publication != null) {
            this.publication.dispose();
        }
    }

    public boolean isDisposed() {
        return this.publication != null && this.publication.isDisposed();
    }

    @Override // reactor.aeron.OnDisposable
    public Mono<Void> onDispose() {
        return this.publication != null ? this.publication.onDispose() : Mono.empty();
    }
}
