package reactor.aeron;

import java.nio.ByteBuffer;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.function.Consumer;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoProcessor;
import reactor.core.publisher.Operators;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:reactor/aeron/AeronWriteSequencer.class */
public final class AeronWriteSequencer {
    private static final int PREFETCH = 32;
    private final Publisher<?> dataPublisher;
    private final long sessionId;
    private final MessagePublication publication;
    private final PublisherSender inner;
    private volatile boolean completed;
    private final MonoProcessor<Void> newPromise;

    /* loaded from: input_file:reactor/aeron/AeronWriteSequencer$PublisherSender.class */
    static class PublisherSender implements CoreSubscriber<Object>, Subscription {
        static final AtomicReferenceFieldUpdater<PublisherSender, Subscription> MISSED_SUBSCRIPTION = AtomicReferenceFieldUpdater.newUpdater(PublisherSender.class, Subscription.class, "missedSubscription");
        static final AtomicLongFieldUpdater<PublisherSender> MISSED_REQUESTED = AtomicLongFieldUpdater.newUpdater(PublisherSender.class, "missedRequested");
        static final AtomicLongFieldUpdater<PublisherSender> MISSED_PRODUCED = AtomicLongFieldUpdater.newUpdater(PublisherSender.class, "missedProduced");
        static final AtomicIntegerFieldUpdater<PublisherSender> WIP = AtomicIntegerFieldUpdater.newUpdater(PublisherSender.class, "wip");
        private final AeronWriteSequencer parent;
        private final long sessionId;
        private final MessagePublication publication;
        private volatile Subscription missedSubscription;
        private volatile long missedRequested;
        private volatile long missedProduced;
        private volatile int wip;
        private boolean inactive;
        private long requested;
        private boolean unbounded;
        private Subscription actual;
        private long produced;

        PublisherSender(AeronWriteSequencer aeronWriteSequencer, MessagePublication messagePublication, long j) {
            this.parent = aeronWriteSequencer;
            this.sessionId = j;
            this.publication = messagePublication;
        }

        public final void cancel() {
            if (this.inactive) {
                return;
            }
            this.inactive = true;
            drain();
        }

        public void onComplete() {
            long j = this.produced;
            this.produced = 0L;
            produced(j);
            this.parent.completed = true;
        }

        public void onError(Throwable th) {
            long j = this.produced;
            this.produced = 0L;
            produced(j);
            this.parent.newPromise.onError(th);
        }

        public void onNext(Object obj) {
            this.produced++;
            this.publication.enqueue((ByteBuffer) obj).doOnSuccess(r5 -> {
                if (this.parent.completed) {
                    this.parent.newPromise.onComplete();
                } else {
                    request(1L);
                }
            }).subscribe((Consumer) null, th -> {
                cancel();
                this.parent.newPromise.onError(new Exception("Failed to publish signal into session: " + this.sessionId, th));
            });
        }

        public void onSubscribe(Subscription subscription) {
            if (this.inactive) {
                subscription.cancel();
                return;
            }
            Objects.requireNonNull(subscription);
            if (this.wip != 0 || !WIP.compareAndSet(this, 0, 1)) {
                MISSED_SUBSCRIPTION.set(this, subscription);
                drain();
                return;
            }
            this.actual = subscription;
            request(32L);
            long j = this.requested;
            if (WIP.decrementAndGet(this) != 0) {
                drainLoop();
            }
            if (j != 0) {
                subscription.request(j);
            }
        }

        public final void request(long j) {
            if (!Operators.validate(j) || this.unbounded) {
                return;
            }
            if (this.wip != 0 || !WIP.compareAndSet(this, 0, 1)) {
                Operators.addCap(MISSED_REQUESTED, this, j);
                drain();
                return;
            }
            long j2 = this.requested;
            if (j2 != Long.MAX_VALUE) {
                long addCap = Operators.addCap(j2, j);
                this.requested = addCap;
                if (addCap == Long.MAX_VALUE) {
                    this.unbounded = true;
                }
            }
            Subscription subscription = this.actual;
            if (WIP.decrementAndGet(this) != 0) {
                drainLoop();
            }
            if (subscription != null) {
                subscription.request(j);
            }
        }

        final void drain() {
            if (WIP.getAndIncrement(this) != 0) {
                return;
            }
            drainLoop();
        }

        final void drainLoop() {
            Subscription subscription;
            int i = 1;
            long j = 0;
            Subscription subscription2 = null;
            do {
                Subscription subscription3 = this.missedSubscription;
                if (subscription3 != null) {
                    subscription3 = MISSED_SUBSCRIPTION.getAndSet(this, null);
                    if (subscription3 == Operators.cancelledSubscription() && (subscription = this.actual) != null) {
                        subscription.cancel();
                        this.actual = null;
                    }
                }
                long j2 = this.missedRequested;
                if (j2 != 0) {
                    j2 = MISSED_REQUESTED.getAndSet(this, 0L);
                }
                long j3 = this.missedProduced;
                if (j3 != 0) {
                    j3 = MISSED_PRODUCED.getAndSet(this, 0L);
                }
                Subscription subscription4 = this.actual;
                if (this.inactive) {
                    if (subscription4 != null) {
                        subscription4.cancel();
                        this.actual = null;
                    }
                    if (subscription3 != null) {
                        subscription3.cancel();
                    }
                } else {
                    long j4 = this.requested;
                    if (j4 != Long.MAX_VALUE) {
                        long addCap = Operators.addCap(j4, j2);
                        if (addCap != Long.MAX_VALUE) {
                            long j5 = addCap - j3;
                            if (j5 < 0) {
                                Operators.reportMoreProduced();
                                j5 = 0;
                            }
                            j4 = j5;
                        } else {
                            j4 = addCap;
                        }
                        this.requested = j4;
                    }
                    if (subscription3 != null) {
                        this.actual = subscription3;
                        if (j4 != 0) {
                            j = Operators.addCap(j, j4);
                            subscription2 = subscription3;
                        }
                    } else if (j2 != 0 && subscription4 != null) {
                        j = Operators.addCap(j, j2);
                        subscription2 = subscription4;
                    }
                }
                i = WIP.addAndGet(this, -i);
            } while (i != 0);
            if (j != 0) {
                subscription2.request(j);
            }
        }

        final void produced(long j) {
            if (this.unbounded) {
                return;
            }
            if (this.wip != 0 || !WIP.compareAndSet(this, 0, 1)) {
                Operators.addCap(MISSED_PRODUCED, this, j);
                drain();
                return;
            }
            long j2 = this.requested;
            if (j2 != Long.MAX_VALUE) {
                long j3 = j2 - j;
                if (j3 < 0) {
                    Operators.reportMoreProduced();
                    j3 = 0;
                }
                this.requested = j3;
            } else {
                this.unbounded = true;
            }
            if (WIP.decrementAndGet(this) == 0) {
                return;
            }
            drainLoop();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public AeronWriteSequencer(long j, MessagePublication messagePublication) {
        this.sessionId = j;
        this.publication = (MessagePublication) Objects.requireNonNull(messagePublication, "message publication must be present");
        this.dataPublisher = null;
        this.inner = null;
        this.newPromise = null;
    }

    private AeronWriteSequencer(long j, MessagePublication messagePublication, Publisher<?> publisher) {
        this.sessionId = j;
        this.publication = messagePublication;
        this.dataPublisher = publisher;
        this.newPromise = MonoProcessor.create();
        this.inner = new PublisherSender(this, messagePublication, j);
    }

    public Mono<Void> write(Publisher<?> publisher) {
        Objects.requireNonNull(publisher, "dataPublisher must be not null");
        return new AeronWriteSequencer(this.sessionId, this.publication, publisher).write0();
    }

    private Mono<Void> write0() {
        return Mono.fromRunnable(() -> {
            this.dataPublisher.subscribe(this.inner);
        }).then(this.newPromise).takeUntilOther(this.publication.onDispose()).doFinally(signalType -> {
            this.inner.cancel();
        });
    }
}
