package reactor.aeron;

import io.aeron.Publication;
import java.time.Duration;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import org.agrona.CloseHelper;
import org.agrona.collections.ArrayUtil;
import org.agrona.concurrent.Agent;
import org.agrona.concurrent.AgentTerminationException;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Disposable;
import reactor.core.Exceptions;
import reactor.core.publisher.BaseSubscriber;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoProcessor;
import reactor.core.publisher.SignalType;

/* loaded from: input_file:reactor/aeron/PublicationAgent.class */
public final class PublicationAgent implements Agent, AeronOutbound, Disposable {
    private static final Logger logger = LoggerFactory.getLogger(PublicationAgent.class);
    private static final AtomicReferenceFieldUpdater<PublicationAgent, PublisherProcessor[]> PUBLISHER_PROCESSORS = AtomicReferenceFieldUpdater.newUpdater(PublicationAgent.class, PublisherProcessor[].class, "publisherProcessors");
    private final Publication publication;
    private volatile Throwable lastError;
    private final Duration connectTimeout = Duration.ofSeconds(5);
    private final Duration backpressureTimeout = Duration.ofSeconds(5);
    private final Duration adminActionTimeout = Duration.ofSeconds(5);
    private volatile PublisherProcessor[] publisherProcessors = new PublisherProcessor[0];
    private final MonoProcessor<Void> onDispose = MonoProcessor.create();
    private volatile boolean isDisposed = false;

    /* loaded from: input_file:reactor/aeron/PublicationAgent$PublisherProcessor.class */
    private static class PublisherProcessor<B> extends BaseSubscriber<B> {
        private final DirectBufferHandler<? super B> bufferHandler;
        private final PublicationAgent parent;
        private long start;
        private boolean requested;
        private final MonoProcessor<Void> onDispose = MonoProcessor.create();
        private volatile B buffer;
        private volatile Throwable error;

        PublisherProcessor(DirectBufferHandler<? super B> directBufferHandler, PublicationAgent publicationAgent) {
            this.bufferHandler = directBufferHandler;
            this.parent = publicationAgent;
            addSelf();
        }

        /* JADX INFO: Access modifiers changed from: private */
        public Mono<Void> onDispose() {
            return this.onDispose;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void request() {
            Subscription upstream;
            if (this.requested || isDisposed() || (upstream = upstream()) == null) {
                return;
            }
            this.requested = true;
            upstream.request(1L);
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void reset() {
            resetBuffer();
            if (!isDisposed()) {
                this.start = 0L;
                this.requested = false;
                return;
            }
            removeSelf();
            if (this.error != null) {
                this.onDispose.onError(this.error);
            } else {
                this.onDispose.onComplete();
            }
        }

        protected void hookOnSubscribe(Subscription subscription) {
        }

        protected void hookOnNext(B b) {
            if (this.buffer == null) {
                this.buffer = b;
                return;
            }
            try {
                this.bufferHandler.dispose(b);
            } catch (Exception e) {
                PublicationAgent.logger.warn("Failed to release buffer: {}", b, e);
            }
            throw Exceptions.failWithOverflow("PublisherProcessor is overrun by more signals than expected");
        }

        protected void hookOnError(Throwable th) {
            this.error = th;
        }

        protected void hookFinally(SignalType signalType) {
            if (this.buffer == null) {
                reset();
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        public long publish(B b) {
            if (this.start == 0) {
                this.start = System.currentTimeMillis();
            }
            return this.parent.publication.offer(this.bufferHandler.map(b));
        }

        /* JADX INFO: Access modifiers changed from: private */
        public boolean isTimeoutElapsed(Duration duration) {
            return System.currentTimeMillis() - this.start > duration.toMillis();
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void cancelDueTo(Throwable th) {
            try {
                cancel();
                resetBuffer();
                this.onDispose.onError(th);
            } catch (Exception e) {
            }
        }

        private void resetBuffer() {
            B b = this.buffer;
            this.buffer = null;
            if (b != null) {
                try {
                    this.bufferHandler.dispose(b);
                } catch (Exception e) {
                    PublicationAgent.logger.warn("Failed to release buffer: {}", b, e);
                }
            }
        }

        private void addSelf() {
            PublisherProcessor[] publisherProcessorArr;
            do {
                publisherProcessorArr = this.parent.publisherProcessors;
            } while (!PublicationAgent.PUBLISHER_PROCESSORS.compareAndSet(this.parent, publisherProcessorArr, (PublisherProcessor[]) ArrayUtil.add(publisherProcessorArr, this)));
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void removeSelf() {
            PublisherProcessor[] publisherProcessorArr;
            do {
                publisherProcessorArr = this.parent.publisherProcessors;
            } while (!PublicationAgent.PUBLISHER_PROCESSORS.compareAndSet(this.parent, publisherProcessorArr, (PublisherProcessor[]) ArrayUtil.remove(publisherProcessorArr, this)));
        }
    }

    public PublicationAgent(Publication publication) {
        this.publication = (Publication) Objects.requireNonNull(publication, "publication cannot be null");
    }

    public void onStart() {
    }

    public int doWork() throws Exception {
        if (this.isDisposed || this.publication.isClosed()) {
            logger.warn("aeron.Publication is CLOSED: {}", this);
            throw new AgentTerminationException("aeron.Publication is CLOSED");
        }
        int i = 0;
        AgentTerminationException agentTerminationException = null;
        for (PublisherProcessor publisherProcessor : this.publisherProcessors) {
            publisherProcessor.request();
            Object obj = publisherProcessor.buffer;
            if (obj != null) {
                try {
                    long publish = publisherProcessor.publish(obj);
                    if (publish <= 0) {
                        if (publish == -4) {
                            logger.warn("aeron.Publication is CLOSED: {}", this);
                            agentTerminationException = new AgentTerminationException("aeron.Publication is CLOSED");
                        }
                        if (publish == -5) {
                            logger.warn("aeron.Publication received MAX_POSITION_EXCEEDED: {}", this);
                            agentTerminationException = new AgentTerminationException("aeron.Publication received MAX_POSITION_EXCEEDED");
                        }
                        if (publish == -1 && publisherProcessor.isTimeoutElapsed(this.connectTimeout)) {
                            logger.warn("aeron.Publication failed to resolve NOT_CONNECTED within {} ms, {}", Long.valueOf(this.connectTimeout.toMillis()), this);
                            agentTerminationException = new AgentTerminationException("Failed to resolve NOT_CONNECTED within timeout");
                        }
                        if (publish == -2 && publisherProcessor.isTimeoutElapsed(this.backpressureTimeout)) {
                            logger.warn("aeron.Publication failed to resolve BACK_PRESSURED within {} ms, {}", Long.valueOf(this.backpressureTimeout.toMillis()), this);
                            agentTerminationException = new AgentTerminationException("Failed to resolve BACK_PRESSURED within timeout");
                        }
                        if (publish == -3 && publisherProcessor.isTimeoutElapsed(this.adminActionTimeout)) {
                            logger.warn("aeron.Publication failed to resolve ADMIN_ACTION within {} ms, {}", Long.valueOf(this.adminActionTimeout.toMillis()), this);
                            agentTerminationException = new AgentTerminationException("Failed to resolve ADMIN_ACTION within timeout");
                        }
                        if (agentTerminationException != null) {
                            break;
                        }
                    } else {
                        i++;
                        publisherProcessor.reset();
                    }
                } catch (Exception e) {
                    publisherProcessor.cancelDueTo(e);
                    publisherProcessor.removeSelf();
                }
            }
        }
        if (agentTerminationException == null) {
            return i;
        }
        this.lastError = agentTerminationException;
        throw agentTerminationException;
    }

    public void onClose() {
        this.isDisposed = true;
        CloseHelper.quietClose(this.publication);
        PublisherProcessor[] publisherProcessorArr = this.publisherProcessors;
        this.publisherProcessors = new PublisherProcessor[0];
        Throwable th = (Throwable) Optional.ofNullable(this.lastError).orElse(new AgentTerminationException("PublisherProcessor has been cancelled"));
        for (PublisherProcessor publisherProcessor : publisherProcessorArr) {
            publisherProcessor.cancelDueTo(th);
        }
        this.onDispose.onComplete();
    }

    public String roleName() {
        return PublicationAgent.class.getName() + ":" + this.publication.sessionId();
    }

    public void dispose() {
        this.isDisposed = true;
    }

    public boolean isDisposed() {
        return this.onDispose.isDisposed();
    }

    @Override // reactor.aeron.AeronOutbound
    public <B> AeronOutbound send(Publisher<B> publisher, DirectBufferHandler<? super B> directBufferHandler) {
        return then(publish(publisher, directBufferHandler));
    }

    private <B> Mono<Void> publish(Publisher<B> publisher, DirectBufferHandler<? super B> directBufferHandler) {
        return Mono.defer(() -> {
            PublisherProcessor publisherProcessor = new PublisherProcessor(directBufferHandler, this);
            publisher.subscribe(publisherProcessor);
            return publisherProcessor.onDispose();
        });
    }

    @Override // reactor.aeron.OnDisposable
    public Mono<Void> onDispose() {
        return this.onDispose;
    }
}
