package reactor.aeron;

import io.aeron.Publication;
import io.aeron.logbuffer.BufferClaim;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.Queue;
import java.util.function.Consumer;
import org.agrona.concurrent.ManyToOneConcurrentLinkedQueue;
import org.agrona.concurrent.UnsafeBuffer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Exceptions;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoProcessor;
import reactor.core.publisher.MonoSink;

/* loaded from: input_file:reactor/aeron/MessagePublication.class */
public class MessagePublication implements OnDisposable {
    private static final Logger logger = LoggerFactory.getLogger(MessagePublication.class);
    private final Publication publication;
    private final AeronEventLoop eventLoop;
    private final Duration connectTimeout;
    private final Duration backpressureTimeout;
    private final Duration adminActionTimeout;
    private final ThreadLocal<BufferClaim> bufferClaims = ThreadLocal.withInitial(BufferClaim::new);
    private final Queue<PublishTask> publishTasks = new ManyToOneConcurrentLinkedQueue();
    private final MonoProcessor<Void> onDispose = MonoProcessor.create();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:reactor/aeron/MessagePublication$PublishTask.class */
    public class PublishTask {
        private final ByteBuffer msgBody;
        private final MonoSink<Void> sink;
        private volatile boolean isDisposed;
        private long start;

        private PublishTask(ByteBuffer byteBuffer, MonoSink<Void> monoSink) {
            this.isDisposed = false;
            this.msgBody = byteBuffer;
            this.sink = monoSink.onDispose(() -> {
                this.isDisposed = true;
            });
        }

        /* JADX INFO: Access modifiers changed from: private */
        public long publish() {
            if (this.isDisposed) {
                return 1L;
            }
            if (this.start == 0) {
                this.start = System.currentTimeMillis();
            }
            int remaining = this.msgBody.remaining();
            int position = this.msgBody.position();
            int limit = this.msgBody.limit();
            if (remaining >= MessagePublication.this.publication.maxPayloadLength()) {
                return MessagePublication.this.publication.offer(new UnsafeBuffer(this.msgBody, position, limit));
            }
            BufferClaim bufferClaim = (BufferClaim) MessagePublication.this.bufferClaims.get();
            long tryClaim = MessagePublication.this.publication.tryClaim(remaining, bufferClaim);
            if (tryClaim > 0) {
                try {
                    bufferClaim.buffer().putBytes(bufferClaim.offset(), this.msgBody, position, limit);
                    bufferClaim.commit();
                } catch (Exception e) {
                    bufferClaim.abort();
                }
            }
            return tryClaim;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public boolean isTimeoutElapsed(Duration duration) {
            return System.currentTimeMillis() - this.start > duration.toMillis();
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void success() {
            if (this.isDisposed) {
                return;
            }
            this.sink.success();
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void error(Throwable th) {
            if (this.isDisposed) {
                return;
            }
            this.sink.error(th);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public MessagePublication(Publication publication, AeronOptions aeronOptions, AeronEventLoop aeronEventLoop) {
        this.publication = publication;
        this.eventLoop = aeronEventLoop;
        this.connectTimeout = aeronOptions.connectTimeout();
        this.backpressureTimeout = aeronOptions.backpressureTimeout();
        this.adminActionTimeout = aeronOptions.adminActionTimeout();
    }

    public Mono<Void> enqueue(ByteBuffer byteBuffer) {
        return Mono.create(monoSink -> {
            boolean z = false;
            if (!isDisposed()) {
                z = this.publishTasks.offer(new PublishTask(byteBuffer, monoSink));
            }
            if (z) {
                return;
            }
            monoSink.error(AeronExceptions.failWithMessagePublicationUnavailable());
        });
    }

    public int proceed() {
        PublishTask peek = this.publishTasks.peek();
        if (peek == null) {
            return 0;
        }
        long publish = peek.publish();
        if (publish > 0) {
            this.publishTasks.poll();
            peek.success();
            return 1;
        }
        if (publish == -4) {
            logger.warn("aeron.Publication is CLOSED: {}", this);
            dispose();
            return 0;
        }
        if (publish == -5) {
            logger.warn("aeron.Publication received MAX_POSITION_EXCEEDED: {}", this);
            dispose();
            return 0;
        }
        RuntimeException runtimeException = null;
        if (publish == -1 && peek.isTimeoutElapsed(this.connectTimeout)) {
            logger.warn("aeron.Publication failed to resolve NOT_CONNECTED within {} ms, {}", Long.valueOf(this.connectTimeout.toMillis()), this);
            runtimeException = AeronExceptions.failWithPublication("Failed to resolve NOT_CONNECTED within timeout");
        }
        if (publish == -2 && peek.isTimeoutElapsed(this.backpressureTimeout)) {
            logger.warn("aeron.Publication failed to resolve BACK_PRESSURED within {} ms, {}", Long.valueOf(this.backpressureTimeout.toMillis()), this);
            runtimeException = AeronExceptions.failWithPublication("Failed to resolve BACK_PRESSURED within timeout");
        }
        if (publish == -3 && peek.isTimeoutElapsed(this.adminActionTimeout)) {
            logger.warn("aeron.Publication failed to resolve ADMIN_ACTION within {} ms, {}", Long.valueOf(this.adminActionTimeout.toMillis()), this);
            runtimeException = AeronExceptions.failWithPublication("Failed to resolve ADMIN_ACTION within timeout");
        }
        if (runtimeException == null) {
            return 0;
        }
        this.publishTasks.poll();
        peek.error(runtimeException);
        return 0;
    }

    public void close() {
        try {
            if (!this.eventLoop.inEventLoop()) {
                throw new IllegalStateException("Can only close aeron publication from within event loop");
            }
            try {
                this.publication.close();
                logger.debug("Disposed {}", this);
                disposePublishTasks();
                this.onDispose.onComplete();
            } catch (Exception e) {
                logger.warn("{} failed on aeron.Publication close(): {}", this, e.toString());
                throw Exceptions.propagate(e);
            }
        } catch (Throwable th) {
            disposePublishTasks();
            this.onDispose.onComplete();
            throw th;
        }
    }

    public int sessionId() {
        return this.publication.sessionId();
    }

    public boolean isDisposed() {
        return this.publication.isClosed();
    }

    public void dispose() {
        this.eventLoop.disposePublication(this).subscribe((Consumer) null, th -> {
        });
    }

    @Override // reactor.aeron.OnDisposable
    public Mono<Void> onDispose() {
        return this.onDispose;
    }

    public Mono<MessagePublication> ensureConnected() {
        return Mono.defer(() -> {
            Duration ofMillis = Duration.ofMillis(100L);
            return ensureConnected0().retryBackoff(Math.max(this.connectTimeout.toMillis() / ofMillis.toMillis(), 1L), ofMillis, ofMillis).timeout(this.connectTimeout).doOnError(th -> {
                logger.warn("aeron.Publication is not connected after several retries");
            }).thenReturn(this);
        });
    }

    private Mono<Void> ensureConnected0() {
        return Mono.defer(() -> {
            return this.publication.isConnected() ? Mono.empty() : Mono.error(AeronExceptions.failWithPublication("aeron.Publication is not connected"));
        });
    }

    private void disposePublishTasks() {
        while (true) {
            PublishTask poll = this.publishTasks.poll();
            if (poll == null) {
                return;
            } else {
                try {
                    poll.error(AeronExceptions.failWithCancel("PublishTask has cancelled"));
                } catch (Exception e) {
                }
            }
        }
    }

    public String toString() {
        return "MessagePublication{pub=" + this.publication.channel() + "}";
    }
}
