package org.neo4j.driver.integration.reactive;

import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.concurrent.atomic.AtomicInteger;
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.neo4j.driver.Session;
import org.neo4j.driver.exceptions.ClientException;
import org.neo4j.driver.exceptions.DatabaseException;
import org.neo4j.driver.exceptions.ServiceUnavailableException;
import org.neo4j.driver.exceptions.SessionExpiredException;
import org.neo4j.driver.exceptions.TransientException;
import org.neo4j.driver.internal.util.EnabledOnNeo4jWith;
import org.neo4j.driver.internal.util.Neo4jFeature;
import org.neo4j.driver.reactive.RxSession;
import org.neo4j.driver.reactive.RxTransaction;
import org.neo4j.driver.reactive.RxTransactionWork;
import org.neo4j.driver.util.DatabaseExtension;
import org.neo4j.driver.util.ParallelizableIT;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;

@EnabledOnNeo4jWith(Neo4jFeature.BOLT_V4)
@ParallelizableIT
/* loaded from: input_file:org/neo4j/driver/integration/reactive/RxSessionIT.class */
class RxSessionIT {

    @RegisterExtension
    static final DatabaseExtension neo4j = new DatabaseExtension();

    /* loaded from: input_file:org/neo4j/driver/integration/reactive/RxSessionIT$InvocationTrackingWork.class */
    private static class InvocationTrackingWork implements RxTransactionWork<Publisher<Integer>> {
        final String query;
        Iterator<RuntimeException> asyncFailures = Collections.emptyIterator();
        Iterator<RuntimeException> syncFailures = Collections.emptyIterator();
        final AtomicInteger invocationCount = new AtomicInteger();

        InvocationTrackingWork(String str) {
            this.query = str;
        }

        InvocationTrackingWork withAsyncFailures(RuntimeException... runtimeExceptionArr) {
            this.asyncFailures = Arrays.asList(runtimeExceptionArr).iterator();
            return this;
        }

        InvocationTrackingWork withSyncFailures(RuntimeException... runtimeExceptionArr) {
            this.syncFailures = Arrays.asList(runtimeExceptionArr).iterator();
            return this;
        }

        int invocationCount() {
            return this.invocationCount.get();
        }

        /* renamed from: execute, reason: merged with bridge method [inline-methods] */
        public Publisher<Integer> m48execute(RxTransaction rxTransaction) {
            this.invocationCount.incrementAndGet();
            if (this.syncFailures.hasNext()) {
                throw this.syncFailures.next();
            }
            return this.asyncFailures.hasNext() ? Mono.error(this.asyncFailures.next()) : Flux.from(rxTransaction.run(this.query).records()).map(record -> {
                return Integer.valueOf(record.get(0).asInt());
            });
        }
    }

    RxSessionIT() {
    }

    @Test
    void shouldAllowSessionRun() {
        StepVerifier.create(Flux.from(neo4j.driver().rxSession().run("UNWIND [1,2,3,4] AS a RETURN a").records()).map(record -> {
            return Integer.valueOf(record.get("a").asInt());
        })).expectNext(1).expectNext(2).expectNext(3).expectNext(4).expectComplete().verify();
    }

    @Test
    void shouldBeAbleToReuseSessionAfterFailure() {
        RxSession rxSession = neo4j.driver().rxSession();
        StepVerifier.create(rxSession.run("INVALID").records()).expectError(ClientException.class).verify();
        StepVerifier.create(rxSession.run("RETURN 1").records()).assertNext(record -> {
            Assertions.assertEquals(record.get("1").asLong(), 1L);
        }).expectComplete().verify();
    }

    @Test
    void shouldRunAsyncTransactionWithoutRetries() {
        RxSession rxSession = neo4j.driver().rxSession();
        InvocationTrackingWork invocationTrackingWork = new InvocationTrackingWork("CREATE (:Apa) RETURN 42");
        StepVerifier.create(rxSession.writeTransaction(invocationTrackingWork)).expectNext(42).verifyComplete();
        Assertions.assertEquals(1, invocationTrackingWork.invocationCount());
        Assertions.assertEquals(1L, countNodesByLabel("Apa"));
    }

    @Test
    void shouldRunAsyncTransactionWithRetriesOnAsyncFailures() {
        RxSession rxSession = neo4j.driver().rxSession();
        InvocationTrackingWork withAsyncFailures = new InvocationTrackingWork("CREATE (:Node) RETURN 24").withAsyncFailures(new ServiceUnavailableException("Oh!"), new SessionExpiredException("Ah!"), new TransientException("Code", "Message"));
        StepVerifier.create(rxSession.writeTransaction(withAsyncFailures)).expectNext(24).verifyComplete();
        Assertions.assertEquals(4, withAsyncFailures.invocationCount());
        Assertions.assertEquals(1L, countNodesByLabel("Node"));
        assertNoParallelScheduler();
    }

    @Test
    void shouldRunAsyncTransactionWithRetriesOnSyncFailures() {
        RxSession rxSession = neo4j.driver().rxSession();
        InvocationTrackingWork withSyncFailures = new InvocationTrackingWork("CREATE (:Test) RETURN 12").withSyncFailures(new TransientException("Oh!", "Deadlock!"), new ServiceUnavailableException("Oh! Network Failure"));
        StepVerifier.create(rxSession.writeTransaction(withSyncFailures)).expectNext(12).verifyComplete();
        Assertions.assertEquals(3, withSyncFailures.invocationCount());
        Assertions.assertEquals(1L, countNodesByLabel("Test"));
        assertNoParallelScheduler();
    }

    @Test
    void shouldRunAsyncTransactionThatCanNotBeRetried() {
        RxSession rxSession = neo4j.driver().rxSession();
        InvocationTrackingWork invocationTrackingWork = new InvocationTrackingWork("UNWIND [10, 5, 0] AS x CREATE (:Hi) RETURN 10/x");
        StepVerifier.create(rxSession.writeTransaction(invocationTrackingWork)).expectNext(1).expectNext(2).expectErrorSatisfies(th -> {
            MatcherAssert.assertThat(th, CoreMatchers.instanceOf(ClientException.class));
        }).verify();
        Assertions.assertEquals(1, invocationTrackingWork.invocationCount());
        Assertions.assertEquals(0L, countNodesByLabel("Hi"));
        assertNoParallelScheduler();
    }

    @Test
    void shouldRunAsyncTransactionThatCanNotBeRetriedAfterATransientFailure() {
        RxSession rxSession = neo4j.driver().rxSession();
        InvocationTrackingWork withAsyncFailures = new InvocationTrackingWork("CREATE (:Person) RETURN 1").withSyncFailures(new TransientException("Oh!", "Deadlock!")).withAsyncFailures(new DatabaseException("Oh!", "OutOfMemory!"));
        StepVerifier.create(rxSession.writeTransaction(withAsyncFailures)).expectErrorSatisfies(th -> {
            MatcherAssert.assertThat(th, CoreMatchers.instanceOf(DatabaseException.class));
            Assertions.assertEquals(1, th.getSuppressed().length);
            MatcherAssert.assertThat(th.getSuppressed()[0], CoreMatchers.instanceOf(TransientException.class));
        }).verify();
        Assertions.assertEquals(2, withAsyncFailures.invocationCount());
        Assertions.assertEquals(0L, countNodesByLabel("Person"));
        assertNoParallelScheduler();
    }

    private void assertNoParallelScheduler() {
        Iterator<Thread> it = Thread.getAllStackTraces().keySet().iterator();
        while (it.hasNext()) {
            MatcherAssert.assertThat(it.next().getName(), CoreMatchers.not(CoreMatchers.startsWith("parallel")));
        }
    }

    private long countNodesByLabel(String str) {
        Session session = neo4j.driver().session();
        try {
            long asLong = session.run("MATCH (n:" + str + ") RETURN count(n)").single().get(0).asLong();
            if (session != null) {
                session.close();
            }
            return asLong;
        } catch (Throwable th) {
            if (session != null) {
                try {
                    session.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }
}
