package reactor.test.subscriber;

import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.atomic.AtomicReference;
import org.reactivestreams.Subscription;
import reactor.core.Fuseable;
import reactor.core.Scannable;
import reactor.core.publisher.Operators;
import reactor.core.publisher.Signal;
import reactor.test.subscriber.TestSubscriber;
import reactor.util.annotation.Nullable;
import reactor.util.context.Context;

/* loaded from: input_file:BOOT-INF/lib/reactor-test-3.4.18.jar:reactor/test/subscriber/DefaultTestSubscriber.class */
class DefaultTestSubscriber<T> implements TestSubscriber<T> {
    final long initialRequest;
    final Context context;
    final TestSubscriber.FusionRequirement fusionRequirement;
    final int requestedFusionMode;
    final int expectedFusionMode;
    Subscription s;

    @Nullable
    Fuseable.QueueSubscription<T> qs;

    @Nullable
    volatile Signal<T> terminalSignal;
    static final AtomicIntegerFieldUpdater<DefaultTestSubscriber> STATE;
    volatile long requestedTotal;
    static final AtomicLongFieldUpdater<DefaultTestSubscriber> REQUESTED_TOTAL;
    volatile long requestedPreSubscription;
    static final AtomicLongFieldUpdater<DefaultTestSubscriber> REQUESTED_PRE_SUBSCRIPTION;
    static final int MASK_TERMINATED = 8;
    static final int MASK_TERMINATING = 4;
    static final int MASK_ON_NEXT = 1;
    static final /* synthetic */ boolean $assertionsDisabled;
    int fusionMode = -1;
    final AtomicBoolean cancelled = new AtomicBoolean();
    final List<T> receivedOnNext = new CopyOnWriteArrayList();
    final List<T> receivedPostCancellation = new CopyOnWriteArrayList();
    final List<Signal<T>> protocolErrors = new CopyOnWriteArrayList();
    volatile int state = 0;
    final CountDownLatch doneLatch = new CountDownLatch(1);
    final AtomicReference<AssertionError> subscriptionFailure = new AtomicReference<>();

    /* JADX INFO: Access modifiers changed from: package-private */
    public DefaultTestSubscriber(TestSubscriberBuilder testSubscriberBuilder) {
        this.initialRequest = testSubscriberBuilder.initialRequest;
        this.context = testSubscriberBuilder.context;
        this.fusionRequirement = testSubscriberBuilder.fusionRequirement;
        this.requestedFusionMode = testSubscriberBuilder.requestedFusionMode;
        this.expectedFusionMode = testSubscriberBuilder.expectedFusionMode;
        REQUESTED_PRE_SUBSCRIPTION.lazySet(this, this.initialRequest);
    }

    @Override // reactor.core.CoreSubscriber
    public Context currentContext() {
        return this.context;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void internalCancel() {
        Subscription subscription = this.s;
        if (!this.cancelled.compareAndSet(false, true) || subscription == null) {
            return;
        }
        subscription.cancel();
        safeClearQueue(subscription);
    }

    void safeClearQueue(@Nullable Subscription subscription) {
        if (subscription instanceof Fuseable.QueueSubscription) {
            ((Fuseable.QueueSubscription) subscription).clear();
        }
    }

    void subscriptionFail(String str) {
        if (this.subscriptionFailure.compareAndSet(null, new AssertionError(str))) {
            internalCancel();
            notifyDone();
        }
    }

    final void notifyDone() {
        this.doneLatch.countDown();
    }

    @Override // reactor.core.CoreSubscriber, org.reactivestreams.Subscriber
    public void onSubscribe(Subscription subscription) {
        if (this.cancelled.get()) {
            subscription.cancel();
            safeClearQueue(subscription);
            return;
        }
        if (!Operators.validate(this.s, subscription)) {
            safeClearQueue(subscription);
            subscriptionFail("TestSubscriber must not be reused, but Subscription has already been set.");
            return;
        }
        this.s = subscription;
        this.fusionMode = -1;
        if (!(subscription instanceof Fuseable.QueueSubscription)) {
            if (this.fusionRequirement == TestSubscriber.FusionRequirement.FUSEABLE) {
                subscriptionFail("TestSubscriber configured to require QueueSubscription, got " + subscription);
                return;
            } else {
                if (this.initialRequest > 0) {
                    long andSet = REQUESTED_PRE_SUBSCRIPTION.getAndSet(this, -1L);
                    if (andSet > 0) {
                        upstreamRequest(subscription, andSet);
                        return;
                    }
                    return;
                }
                return;
            }
        }
        if (this.fusionRequirement == TestSubscriber.FusionRequirement.NOT_FUSEABLE) {
            subscriptionFail("TestSubscriber configured to reject QueueSubscription, got " + subscription);
            return;
        }
        this.qs = (Fuseable.QueueSubscription) subscription;
        int requestFusion = this.qs.requestFusion(this.requestedFusionMode);
        if (this.expectedFusionMode != requestFusion && this.expectedFusionMode != 3) {
            subscriptionFail("TestSubscriber negotiated fusion mode inconsistent, expected " + Fuseable.fusionModeName(this.expectedFusionMode) + " got " + Fuseable.fusionModeName(requestFusion));
            return;
        }
        this.fusionMode = requestFusion;
        if (requestFusion != 1) {
            long andSet2 = REQUESTED_PRE_SUBSCRIPTION.getAndSet(this, -1L);
            if (andSet2 > 0) {
                upstreamRequest(subscription, andSet2);
                return;
            }
            return;
        }
        while (!this.cancelled.get()) {
            T poll = this.qs.poll();
            if (poll == null) {
                onComplete();
                return;
            }
            onNext(poll);
        }
        safeClearQueue(this.qs);
    }

    @Override // org.reactivestreams.Subscriber
    public void onNext(@Nullable T t) {
        int markOnNextStart = markOnNextStart();
        boolean isMarkedTerminated = isMarkedTerminated(markOnNextStart);
        boolean isMarkedOnNext = isMarkedOnNext(markOnNextStart);
        if (isMarkedTerminated || isMarkedOnNext) {
            if (t != null) {
                this.protocolErrors.add(Signal.next(t));
                return;
            } else if (isMarkedTerminated) {
                this.protocolErrors.add(Signal.error(new AssertionError("onNext(null) received despite SYNC fusion (which has already completed)")));
                return;
            } else {
                this.protocolErrors.add(Signal.error(new AssertionError("onNext(null) received despite SYNC fusion (with concurrent onNext)")));
                return;
            }
        }
        if (t == null) {
            if (this.fusionMode == 2) {
                drainAsync(false);
                return;
            }
            subscriptionFail("onNext(null) received while ASYNC fusion not established");
        }
        this.receivedOnNext.add(t);
        if (this.cancelled.get()) {
            this.receivedPostCancellation.add(t);
        }
        checkTerminatedAfterOnNext();
    }

    @Override // org.reactivestreams.Subscriber
    public void onComplete() {
        Signal<T> complete = Signal.complete();
        int markTerminated = markTerminated();
        if (isMarkedTerminated(markTerminated) || isMarkedTerminating(markTerminated)) {
            this.protocolErrors.add(complete);
            return;
        }
        if (isMarkedOnNext(markTerminated)) {
            this.protocolErrors.add(complete);
            this.terminalSignal = complete;
            return;
        }
        this.terminalSignal = complete;
        if (this.fusionMode == 2) {
            drainAsync(true);
        } else {
            notifyDone();
        }
    }

    @Override // org.reactivestreams.Subscriber
    public void onError(Throwable th) {
        Signal<T> error = Signal.error(th);
        int markTerminated = markTerminated();
        if (isMarkedTerminated(markTerminated) || isMarkedTerminating(markTerminated)) {
            this.protocolErrors.add(error);
            return;
        }
        if (isMarkedOnNext(markTerminated)) {
            this.protocolErrors.add(error);
            this.terminalSignal = error;
            return;
        }
        this.terminalSignal = error;
        if (this.fusionMode == 2) {
            drainAsync(true);
        } else {
            notifyDone();
        }
    }

    void drainAsync(boolean z) {
        if (!$assertionsDisabled && this.qs == null) {
            throw new AssertionError();
        }
        int i = this.state;
        if (z && isMarkedOnNext(i)) {
            return;
        }
        if (isMarkedTerminated(i)) {
            safeClearQueue(this.qs);
            notifyDone();
            return;
        }
        while (!this.cancelled.get()) {
            long j = REQUESTED_TOTAL.get(this);
            if (j != Long.MAX_VALUE && j - this.receivedOnNext.size() < 1) {
                if (checkTerminatedAfterOnNext()) {
                    safeClearQueue(this.qs);
                    return;
                }
                return;
            } else {
                T poll = this.qs.poll();
                if (poll == null) {
                    if (checkTerminatedAfterOnNext()) {
                        safeClearQueue(this.qs);
                        return;
                    }
                    return;
                }
                this.receivedOnNext.add(poll);
            }
        }
        safeClearQueue(this.qs);
        notifyDone();
    }

    @Override // reactor.core.Scannable
    @Nullable
    public Object scanUnsafe(Scannable.Attr attr) {
        if (attr == Scannable.Attr.TERMINATED) {
            return Boolean.valueOf((this.terminalSignal == null && this.subscriptionFailure.get() == null) ? false : true);
        }
        if (attr == Scannable.Attr.CANCELLED) {
            return Boolean.valueOf(this.cancelled.get());
        }
        if (attr == Scannable.Attr.ERROR) {
            AssertionError assertionError = this.subscriptionFailure.get();
            Signal<T> signal = this.terminalSignal;
            return (signal == null || signal.getThrowable() == null) ? assertionError : signal.getThrowable();
        }
        if (attr == Scannable.Attr.PARENT) {
            return this.s;
        }
        if (attr == Scannable.Attr.RUN_STYLE) {
            return Scannable.Attr.RunStyle.SYNC;
        }
        if (attr == Scannable.Attr.REQUESTED_FROM_DOWNSTREAM) {
            return Long.valueOf(REQUESTED_TOTAL.get(this));
        }
        return null;
    }

    void upstreamRequest(Subscription subscription, long j) {
        if (Operators.addCap(REQUESTED_TOTAL, this, j) != Long.MAX_VALUE) {
            subscription.request(j);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean checkTerminatedAfterOnNext() {
        if (!isMarkedTerminating(markOnNextDone())) {
            return false;
        }
        notifyDone();
        return true;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static boolean isMarkedTerminated(int i) {
        return (i & 8) == 8;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static boolean isMarkedOnNext(int i) {
        return (i & 1) == 1;
    }

    static boolean isMarkedTerminating(int i) {
        return (i & 4) == 4 && (i & 8) != 8;
    }

    int markTerminated() {
        int i;
        do {
            i = this.state;
            if (isMarkedTerminated(i) || isMarkedTerminating(i)) {
                return i;
            }
        } while (!STATE.compareAndSet(this, i, isMarkedOnNext(i) ? i | 4 : 8));
        return i;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public int markOnNextStart() {
        int i;
        do {
            i = this.state;
            if (i != 0) {
                return i;
            }
        } while (!STATE.compareAndSet(this, i, 1));
        return i;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public int markOnNextDone() {
        int i;
        do {
            i = this.state;
        } while (!STATE.compareAndSet(this, i, i & (-2)));
        return i;
    }

    @Override // reactor.test.subscriber.TestSubscriber
    public void cancel() {
        if (this.cancelled.compareAndSet(false, true)) {
            if (this.s != null) {
                this.s.cancel();
            }
            if (this.requestedFusionMode == 2) {
                int i = this.state;
                Fuseable.QueueSubscription<T> queueSubscription = this.qs;
                if (!isMarkedOnNext(i) && queueSubscription != null) {
                    queueSubscription.clear();
                }
            }
            notifyDone();
        }
    }

    @Override // reactor.test.subscriber.TestSubscriber
    public void request(long j) {
        long j2;
        if (this.s != null) {
            if (Operators.validate(j)) {
                if (this.fusionMode == 1) {
                    internalCancel();
                    throw new IllegalStateException("Request is short circuited in SYNC fusion mode, and should not be explicitly used");
                }
                upstreamRequest(this.s, j);
                return;
            }
            return;
        }
        do {
            j2 = REQUESTED_PRE_SUBSCRIPTION.get(this);
            if (j2 == -1) {
                request(j);
                return;
            }
        } while (!REQUESTED_PRE_SUBSCRIPTION.compareAndSet(this, j2, Operators.addCap(j2, j)));
    }

    void checkSubscriptionFailure() {
        AssertionError assertionError = this.subscriptionFailure.get();
        if (assertionError != null) {
            throw assertionError;
        }
    }

    @Override // reactor.test.subscriber.TestSubscriber
    public boolean isTerminatedOrCancelled() {
        checkSubscriptionFailure();
        return this.doneLatch.getCount() == 0;
    }

    @Override // reactor.test.subscriber.TestSubscriber
    public boolean isTerminated() {
        checkSubscriptionFailure();
        return this.terminalSignal != null;
    }

    @Override // reactor.test.subscriber.TestSubscriber
    public boolean isTerminatedComplete() {
        checkSubscriptionFailure();
        Signal<T> signal = this.terminalSignal;
        return signal != null && signal.isOnComplete();
    }

    @Override // reactor.test.subscriber.TestSubscriber
    public boolean isTerminatedError() {
        checkSubscriptionFailure();
        Signal<T> signal = this.terminalSignal;
        return signal != null && signal.isOnError();
    }

    @Override // reactor.test.subscriber.TestSubscriber
    public boolean isCancelled() {
        checkSubscriptionFailure();
        return this.cancelled.get();
    }

    @Override // reactor.test.subscriber.TestSubscriber
    @Nullable
    public Signal<T> getTerminalSignal() {
        checkSubscriptionFailure();
        return this.terminalSignal;
    }

    @Override // reactor.test.subscriber.TestSubscriber
    public Signal<T> expectTerminalSignal() {
        checkSubscriptionFailure();
        Signal<T> signal = this.terminalSignal;
        if (signal != null && (signal.isOnError() || signal.isOnComplete())) {
            return signal;
        }
        cancel();
        throw new AssertionError("Expected subscriber to be terminated, but it has not been terminated yet: cancelling subscription.");
    }

    @Override // reactor.test.subscriber.TestSubscriber
    public Throwable expectTerminalError() {
        checkSubscriptionFailure();
        Signal<T> signal = this.terminalSignal;
        if (signal == null) {
            cancel();
            throw new AssertionError("Expected subscriber to have errored, but it has not been terminated yet.");
        }
        if (signal.isOnComplete()) {
            throw new AssertionError("Expected subscriber to have errored, but it has completed instead.");
        }
        Throwable throwable = signal.getThrowable();
        if (throwable != null) {
            return throwable;
        }
        cancel();
        throw new AssertionError("Expected subscriber to have errored, got unexpected terminal signal <" + signal + ">.");
    }

    @Override // reactor.test.subscriber.TestSubscriber
    public List<T> getReceivedOnNext() {
        checkSubscriptionFailure();
        return new ArrayList(this.receivedOnNext);
    }

    @Override // reactor.test.subscriber.TestSubscriber
    public List<T> getReceivedOnNextAfterCancellation() {
        checkSubscriptionFailure();
        return new ArrayList(this.receivedPostCancellation);
    }

    @Override // reactor.test.subscriber.TestSubscriber
    public List<Signal<T>> getProtocolErrors() {
        checkSubscriptionFailure();
        return new ArrayList(this.protocolErrors);
    }

    @Override // reactor.test.subscriber.TestSubscriber
    public int getFusionMode() {
        checkSubscriptionFailure();
        return this.fusionMode;
    }

    @Override // reactor.test.subscriber.TestSubscriber
    public void block() {
        try {
            this.doneLatch.await();
            checkSubscriptionFailure();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new AssertionError("Block() interrupted", e);
        }
    }

    @Override // reactor.test.subscriber.TestSubscriber
    public void block(Duration duration) {
        long millis = duration.toMillis();
        try {
            boolean await = this.doneLatch.await(millis, TimeUnit.MILLISECONDS);
            checkSubscriptionFailure();
            if (await) {
            } else {
                throw new AssertionError("TestSubscriber timed out, not terminated after " + duration + " (" + millis + "ms)");
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new AssertionError("Block(" + duration + ") interrupted", e);
        }
    }

    static {
        $assertionsDisabled = !DefaultTestSubscriber.class.desiredAssertionStatus();
        STATE = AtomicIntegerFieldUpdater.newUpdater(DefaultTestSubscriber.class, "state");
        REQUESTED_TOTAL = AtomicLongFieldUpdater.newUpdater(DefaultTestSubscriber.class, "requestedTotal");
        REQUESTED_PRE_SUBSCRIPTION = AtomicLongFieldUpdater.newUpdater(DefaultTestSubscriber.class, "requestedPreSubscription");
    }
}
