package org.infinispan.util.concurrent;

import io.reactivex.rxjava3.core.Flowable;
import io.reactivex.rxjava3.processors.AsyncProcessor;
import io.reactivex.rxjava3.processors.UnicastProcessor;
import io.reactivex.rxjava3.subscribers.TestSubscriber;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;
import org.infinispan.commons.test.BlockHoundHelper;
import org.infinispan.commons.util.concurrent.CompletionStages;
import org.infinispan.test.AbstractInfinispanTest;
import org.infinispan.test.Mocks;
import org.mockito.Mockito;
import org.reactivestreams.Publisher;
import org.testng.AssertJUnit;
import org.testng.annotations.Test;

@Test(groups = {"unit"}, testName = "util.concurrent.BlockingManagerTest")
/* loaded from: input_file:org/infinispan/util/concurrent/BlockingManagerTest.class */
public class BlockingManagerTest extends AbstractInfinispanTest {
    Executor nonBlockingExecutor;
    Executor blockingExecutor;

    public void initializeMocks() {
        this.nonBlockingExecutor = (Executor) Mockito.mock(Executor.class, Mockito.withSettings().defaultAnswer(Mocks.runWithExecutorAnswer(BlockHoundHelper.ensureNonBlockingExecutor())));
        this.blockingExecutor = (Executor) Mockito.mock(Executor.class, Mockito.withSettings().defaultAnswer(Mocks.runWithExecutorAnswer(BlockHoundHelper.allowBlockingExecutor())));
    }

    private BlockingManager createBlockingManager(final boolean z) {
        initializeMocks();
        BlockingManagerImpl blockingManagerImpl = new BlockingManagerImpl(this) { // from class: org.infinispan.util.concurrent.BlockingManagerTest.1
            final /* synthetic */ BlockingManagerTest this$0;

            {
                this.this$0 = this;
            }

            protected boolean isCurrentThreadBlocking() {
                return z;
            }
        };
        blockingManagerImpl.nonBlockingExecutor = this.nonBlockingExecutor;
        blockingManagerImpl.blockingExecutor = this.blockingExecutor;
        blockingManagerImpl.start();
        return blockingManagerImpl;
    }

    private static <E> Flowable<E> ensureUserPublisherSubscribeCompleteOnBlockingThread(Publisher<E> publisher) {
        return Flowable.fromPublisher(publisher).doOnSubscribe(subscription -> {
            AssertJUnit.assertFalse(BlockHoundHelper.currentThreadRequiresNonBlocking());
        }).doOnComplete(() -> {
            AssertJUnit.assertFalse(BlockHoundHelper.currentThreadRequiresNonBlocking());
        });
    }

    private static <E> Flowable<E> ensurePublisherValuesOnNonBlockingThread(Publisher<E> publisher) {
        return Flowable.fromPublisher(publisher).doOnNext(obj -> {
            AssertJUnit.assertTrue(BlockHoundHelper.currentThreadRequiresNonBlocking());
        }).doOnComplete(() -> {
            AssertJUnit.assertTrue(BlockHoundHelper.currentThreadRequiresNonBlocking());
        });
    }

    public void testBlockingPublishToVoidStageInvokedBlockingThread() {
        AssertJUnit.assertTrue(CompletionStages.isCompletedSuccessfully(createBlockingManager(true).blockingPublisherToVoidStage(Flowable.fromArray(new Object[]{1, 2, 3}).doOnNext(BlockHoundHelper::blockingConsume), (Object) null)));
        Mockito.verifyNoInteractions(new Object[]{this.nonBlockingExecutor, this.blockingExecutor});
    }

    public void testBlockingPublishToVoidStageInvokedNonBlockingThread() {
        AssertJUnit.assertTrue(CompletionStages.isCompletedSuccessfully(createBlockingManager(false).blockingPublisherToVoidStage(Flowable.just(1).doOnNext((v0) -> {
            BlockHoundHelper.blockingConsume(v0);
        }), (Object) null)));
        ((Executor) Mockito.verify(this.blockingExecutor)).execute((Runnable) Mockito.any());
        Mockito.verifyNoInteractions(new Object[]{this.nonBlockingExecutor});
    }

    public void testBlockingPublishToVoidStageInvokedNonBlockingThreadCompleteAfterSubscribe() {
        BlockingManager createBlockingManager = createBlockingManager(false);
        AsyncProcessor create = AsyncProcessor.create();
        create.onNext(1);
        CompletionStage blockingPublisherToVoidStage = createBlockingManager.blockingPublisherToVoidStage(create.doOnNext(BlockHoundHelper::blockingConsume), (Object) null);
        AssertJUnit.assertFalse(CompletionStages.isCompletedSuccessfully(blockingPublisherToVoidStage));
        create.onComplete();
        AssertJUnit.assertTrue(CompletionStages.isCompletedSuccessfully(blockingPublisherToVoidStage));
        ((Executor) Mockito.verify(this.blockingExecutor)).execute((Runnable) Mockito.any());
        ((Executor) Mockito.verify(this.nonBlockingExecutor)).execute((Runnable) Mockito.any());
    }

    public void testBlockingPublisherInvokedBlockingThread() {
        Publisher blockingPublisher = createBlockingManager(true).blockingPublisher(Flowable.just(1).doOnNext((v0) -> {
            BlockHoundHelper.blockingConsume(v0);
        }));
        TestSubscriber create = TestSubscriber.create();
        blockingPublisher.subscribe(create);
        create.assertComplete();
        Mockito.verifyNoInteractions(new Object[]{this.nonBlockingExecutor, this.blockingExecutor});
    }

    public void testBlockingPublisherInvokedBlockingThreadCompleteAfterSubscribe() {
        BlockingManager createBlockingManager = createBlockingManager(true);
        AsyncProcessor create = AsyncProcessor.create();
        create.onNext(1);
        Publisher blockingPublisher = createBlockingManager.blockingPublisher(create.doOnNext((v0) -> {
            BlockHoundHelper.blockingConsume(v0);
        }));
        TestSubscriber create2 = TestSubscriber.create();
        blockingPublisher.subscribe(create2);
        create2.assertNotComplete();
        create.onComplete();
        create2.assertComplete();
        Mockito.verifyNoInteractions(new Object[]{this.nonBlockingExecutor, this.blockingExecutor});
    }

    public void testBlockingPublisherInvokedNonBlockingThread() {
        Publisher blockingPublisher = createBlockingManager(false).blockingPublisher(ensureUserPublisherSubscribeCompleteOnBlockingThread(Flowable.just(1)).doOnNext((v0) -> {
            BlockHoundHelper.blockingConsume(v0);
        }));
        TestSubscriber create = TestSubscriber.create();
        ensurePublisherValuesOnNonBlockingThread(blockingPublisher).subscribe(create);
        create.assertComplete();
        ((Executor) Mockito.verify(this.blockingExecutor)).execute((Runnable) Mockito.any());
        ((Executor) Mockito.verify(this.nonBlockingExecutor, Mockito.times(3))).execute((Runnable) Mockito.any());
    }

    public void testBlockingPublisherInvokedNonBlockingThreadCompleteAfterSubscribe() {
        BlockingManager createBlockingManager = createBlockingManager(false);
        UnicastProcessor create = UnicastProcessor.create();
        create.onNext(1);
        Publisher blockingPublisher = createBlockingManager.blockingPublisher(ensureUserPublisherSubscribeCompleteOnBlockingThread(create).doOnNext((v0) -> {
            BlockHoundHelper.blockingConsume(v0);
        }));
        TestSubscriber create2 = TestSubscriber.create();
        ensurePublisherValuesOnNonBlockingThread(blockingPublisher).subscribe(create2);
        create2.assertNotComplete();
        create.onComplete();
        create2.assertComplete();
        ((Executor) Mockito.verify(this.blockingExecutor)).execute((Runnable) Mockito.any());
        ((Executor) Mockito.verify(this.nonBlockingExecutor, Mockito.times(3))).execute((Runnable) Mockito.any());
    }

    public void testBlockingPublisherInvokedNonBlockingThreadCancelled() {
        Publisher blockingPublisher = createBlockingManager(false).blockingPublisher(ensureUserPublisherSubscribeCompleteOnBlockingThread(Flowable.range(1, 10)).doOnSubscribe(subscription -> {
            AssertJUnit.assertFalse(BlockHoundHelper.currentThreadRequiresNonBlocking());
        }).doOnComplete(() -> {
            AssertJUnit.assertFalse(BlockHoundHelper.currentThreadRequiresNonBlocking());
        }).doOnNext((v0) -> {
            BlockHoundHelper.blockingConsume(v0);
        }));
        TestSubscriber create = TestSubscriber.create();
        ensurePublisherValuesOnNonBlockingThread(blockingPublisher).take(5).subscribe(create);
        create.assertComplete();
        ((Executor) Mockito.verify(this.blockingExecutor, Mockito.atMost(2))).execute((Runnable) Mockito.any());
        ((Executor) Mockito.verify(this.nonBlockingExecutor, Mockito.times(6))).execute((Runnable) Mockito.any());
    }
}
