package org.axonframework.eventsourcing.eventstore;

import java.time.Duration;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.axonframework.common.AxonThreadFactory;
import org.axonframework.common.transaction.NoOpTransactionManager;
import org.axonframework.common.transaction.TransactionManager;
import org.axonframework.eventhandling.DefaultEventBusSpanFactory;
import org.axonframework.eventhandling.DomainEventMessage;
import org.axonframework.eventhandling.EventMessage;
import org.axonframework.eventhandling.GenericTrackedEventMessage;
import org.axonframework.eventhandling.GlobalSequenceTrackingToken;
import org.axonframework.eventhandling.TrackedEventMessage;
import org.axonframework.eventhandling.TrackingEventStream;
import org.axonframework.eventhandling.TrackingToken;
import org.axonframework.eventsourcing.utils.EventStoreTestUtils;
import org.axonframework.eventsourcing.utils.MockException;
import org.axonframework.messaging.Message;
import org.axonframework.messaging.unitofwork.CurrentUnitOfWork;
import org.axonframework.messaging.unitofwork.LegacyDefaultUnitOfWork;
import org.axonframework.tracing.TestSpanFactory;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.testcontainers.shaded.org.awaitility.Awaitility;
import org.testcontainers.shaded.org.awaitility.core.ConditionFactory;

/* loaded from: input_file:org/axonframework/eventsourcing/eventstore/EmbeddedEventStoreTest.class */
public abstract class EmbeddedEventStoreTest {
    private static final int CACHED_EVENTS = 10;
    private static final long FETCH_DELAY = 1000;
    private static final long CLEANUP_DELAY = 10000;
    private static final boolean OPTIMIZE_EVENT_CONSUMPTION = true;
    private LegacyEmbeddedEventStore testSubject;
    protected TransactionManager transactionManager;
    private LegacyEventStorageEngine storageEngine;
    private ThreadFactory threadFactory;
    private TestSpanFactory spanFactory;

    /* loaded from: input_file:org/axonframework/eventsourcing/eventstore/EmbeddedEventStoreTest$SynchronizedBooleanAnswer.class */
    private static class SynchronizedBooleanAnswer implements Answer<Boolean> {
        private final boolean answer;

        private SynchronizedBooleanAnswer(boolean z) {
            this.answer = z;
        }

        /* renamed from: answer, reason: merged with bridge method [inline-methods] */
        public synchronized Boolean m23answer(InvocationOnMock invocationOnMock) {
            return Boolean.valueOf(this.answer);
        }
    }

    @BeforeEach
    void setUp() {
        this.spanFactory = new TestSpanFactory();
        this.transactionManager = getTransactionManager();
        this.storageEngine = (LegacyEventStorageEngine) Mockito.spy(createStorageEngine());
        this.threadFactory = (ThreadFactory) Mockito.spy(new AxonThreadFactory(LegacyEmbeddedEventStore.class.getSimpleName()));
        newTestSubject(CACHED_EVENTS, FETCH_DELAY, CLEANUP_DELAY, true);
    }

    public abstract LegacyEventStorageEngine createStorageEngine();

    public TransactionManager getTransactionManager() {
        this.transactionManager = new NoOpTransactionManager();
        return this.transactionManager;
    }

    private void newTestSubject(int i, long j, long j2, boolean z) {
        Optional.ofNullable(this.testSubject).ifPresent((v0) -> {
            v0.shutDown();
        });
        this.testSubject = LegacyEmbeddedEventStore.builder().storageEngine(this.storageEngine).cachedEvents(i).fetchDelay(j).cleanupDelay(j2).threadFactory(this.threadFactory).optimizeEventConsumption(z).spanFactory(DefaultEventBusSpanFactory.builder().spanFactory(this.spanFactory).build()).build();
    }

    @AfterEach
    void tearDown() {
        this.testSubject.shutDown();
    }

    @Test
    void existingEventIsPassedToReader() throws Exception {
        EventMessage createEvent = EventStoreTestUtils.createEvent();
        this.testSubject.publish(new EventMessage[]{createEvent});
        TrackingEventStream openStream = this.testSubject.openStream((TrackingToken) null);
        Assertions.assertTrue(openStream.hasNextAvailable());
        DomainEventMessage domainEventMessage = (TrackedEventMessage) openStream.nextAvailable();
        Assertions.assertEquals(createEvent.getIdentifier(), domainEventMessage.getIdentifier());
        Assertions.assertEquals(createEvent.getPayload(), domainEventMessage.getPayload());
        Assertions.assertTrue(domainEventMessage instanceof DomainEventMessage);
        Assertions.assertEquals(createEvent.getAggregateIdentifier(), domainEventMessage.getAggregateIdentifier());
    }

    @Timeout(value = 100, unit = TimeUnit.MILLISECONDS)
    @Test
    void eventPublishedAfterOpeningStreamIsPassedToReaderImmediately() throws Exception {
        TrackingEventStream openStream = this.testSubject.openStream((TrackingToken) null);
        Assertions.assertFalse(openStream.hasNextAvailable());
        EventMessage createEvent = EventStoreTestUtils.createEvent();
        Thread thread = new Thread(() -> {
            try {
                Assertions.assertEquals(createEvent.getIdentifier(), ((TrackedEventMessage) openStream.nextAvailable()).getIdentifier());
            } catch (InterruptedException e) {
                Assertions.fail();
            }
        });
        thread.start();
        this.testSubject.publish(new EventMessage[]{createEvent});
        thread.join();
    }

    @Timeout(5)
    @Test
    void readingIsBlockedWhenStoreIsEmpty() throws Exception {
        CountDownLatch countDownLatch = new CountDownLatch(OPTIMIZE_EVENT_CONSUMPTION);
        TrackingEventStream openStream = this.testSubject.openStream((TrackingToken) null);
        Thread thread = new Thread(() -> {
            openStream.asStream().findFirst().ifPresent(trackedEventMessage -> {
                countDownLatch.countDown();
            });
        });
        thread.start();
        Assertions.assertFalse(countDownLatch.await(100L, TimeUnit.MILLISECONDS));
        this.testSubject.publish(new EventMessage[]{EventStoreTestUtils.createEvent()});
        thread.join();
        Assertions.assertEquals(0L, countDownLatch.getCount());
    }

    @Timeout(5)
    @Test
    void readingIsBlockedWhenEndOfStreamIsReached() throws Exception {
        CountDownLatch countDownLatch = new CountDownLatch(2);
        this.testSubject.publish(new EventMessage[]{EventStoreTestUtils.createEvent()});
        TrackingEventStream openStream = this.testSubject.openStream((TrackingToken) null);
        Thread thread = new Thread(() -> {
            openStream.asStream().limit(2L).forEach(trackedEventMessage -> {
                countDownLatch.countDown();
            });
        });
        thread.start();
        Assertions.assertFalse(countDownLatch.await(100L, TimeUnit.MILLISECONDS));
        Assertions.assertEquals(1L, countDownLatch.getCount());
        this.testSubject.publish(new EventMessage[]{EventStoreTestUtils.createEvent("unique-aggregate-id", 0L)});
        thread.join();
        Assertions.assertFalse(thread.isAlive());
        Assertions.assertEquals(0L, countDownLatch.getCount());
    }

    @Timeout(5)
    @Test
    void readingCanBeContinuedUsingLastToken() throws Exception {
        List<DomainEventMessage<?>> createEvents = EventStoreTestUtils.createEvents(2);
        this.testSubject.publish(createEvents);
        TrackedEventMessage trackedEventMessage = (TrackedEventMessage) this.testSubject.openStream((TrackingToken) null).nextAvailable();
        TrackedEventMessage trackedEventMessage2 = (TrackedEventMessage) this.testSubject.openStream(trackedEventMessage.trackingToken()).nextAvailable();
        Assertions.assertEquals(createEvents.get(0).getIdentifier(), trackedEventMessage.getIdentifier());
        Assertions.assertEquals(createEvents.get(OPTIMIZE_EVENT_CONSUMPTION).getIdentifier(), trackedEventMessage2.getIdentifier());
    }

    @Timeout(5)
    @Test
    void eventIsFetchedFromCacheWhenFetchedASecondTime() throws Exception {
        CountDownLatch countDownLatch = new CountDownLatch(2);
        CopyOnWriteArrayList copyOnWriteArrayList = new CopyOnWriteArrayList();
        Thread thread = new Thread(() -> {
            this.testSubject.openStream((TrackingToken) null).asStream().limit(2L).forEach(trackedEventMessage -> {
                countDownLatch.countDown();
                copyOnWriteArrayList.add(trackedEventMessage);
            });
        });
        thread.start();
        Assertions.assertFalse(countDownLatch.await(100L, TimeUnit.MILLISECONDS));
        this.testSubject.publish(EventStoreTestUtils.createEvents(2));
        thread.join();
        Assertions.assertFalse(thread.isAlive());
        Assertions.assertSame(copyOnWriteArrayList.get(OPTIMIZE_EVENT_CONSUMPTION), (TrackedEventMessage) this.testSubject.openStream(((TrackedEventMessage) copyOnWriteArrayList.get(0)).trackingToken()).nextAvailable());
    }

    @Timeout(5)
    @Test
    void periodicPollingWhenEventStorageIsUpdatedIndependently() throws Exception {
        newTestSubject(CACHED_EVENTS, 20L, CLEANUP_DELAY, true);
        CountDownLatch countDownLatch = new CountDownLatch(OPTIMIZE_EVENT_CONSUMPTION);
        TrackingEventStream openStream = this.testSubject.openStream((TrackingToken) null);
        Thread thread = new Thread(() -> {
            openStream.asStream().findFirst().ifPresent(trackedEventMessage -> {
                countDownLatch.countDown();
            });
        });
        thread.start();
        Assertions.assertFalse(countDownLatch.await(100L, TimeUnit.MILLISECONDS));
        this.storageEngine.appendEvents(new EventMessage[]{EventStoreTestUtils.createEvent()});
        thread.join();
        Assertions.assertFalse(thread.isAlive());
        Assertions.assertTrue(countDownLatch.await(100L, TimeUnit.MILLISECONDS));
    }

    @Timeout(5)
    @Test
    void consumerStopsTailingWhenItFallsBehindTheCache() throws Exception {
        newTestSubject(CACHED_EVENTS, FETCH_DELAY, 20L, true);
        TrackingEventStream openStream = this.testSubject.openStream((TrackingToken) null);
        Assertions.assertFalse(openStream.hasNextAvailable());
        this.testSubject.publish(EventStoreTestUtils.createEvents(CACHED_EVENTS));
        ConditionFactory atMost = Awaitility.await().pollDelay(Duration.ofMillis(50L)).atMost(Duration.ofMillis(500L));
        Objects.requireNonNull(openStream);
        atMost.until(openStream::hasNextAvailable);
        Mockito.reset(new LegacyEventStorageEngine[]{this.storageEngine});
        TrackedEventMessage trackedEventMessage = (TrackedEventMessage) openStream.nextAvailable();
        Mockito.verifyNoInteractions(new Object[]{this.storageEngine});
        this.testSubject.publish(new EventMessage[]{EventStoreTestUtils.createEvent(10L), EventStoreTestUtils.createEvent(11L)});
        Thread.sleep(100L);
        Mockito.reset(new LegacyEventStorageEngine[]{this.storageEngine});
        Assertions.assertTrue(openStream.hasNextAvailable());
        ((LegacyEventStorageEngine) Mockito.verify(this.storageEngine)).readEvents(trackedEventMessage.trackingToken(), false);
    }

    @Test
    void loadWithoutSnapshot() {
        String uuid = UUID.randomUUID().toString();
        this.testSubject.publish(EventStoreTestUtils.createEvents(() -> {
            return uuid;
        }, 110));
        List list = (List) this.testSubject.readEvents(uuid).asStream().collect(Collectors.toList());
        Assertions.assertEquals(110, list.size());
        Assertions.assertEquals(109L, ((DomainEventMessage) list.get(list.size() - OPTIMIZE_EVENT_CONSUMPTION)).getSequenceNumber());
    }

    @Test
    void loadWithSnapshot() {
        String uuid = UUID.randomUUID().toString();
        this.testSubject.publish(EventStoreTestUtils.createEvents(() -> {
            return uuid;
        }, 110));
        this.transactionManager.executeInTransaction(() -> {
            this.storageEngine.storeSnapshot(EventStoreTestUtils.createEvent(uuid, 30L));
        });
        List list = (List) this.testSubject.readEvents(uuid).asStream().collect(Collectors.toList());
        Assertions.assertEquals(80, list.size());
        Assertions.assertEquals(30L, ((DomainEventMessage) list.get(0)).getSequenceNumber());
        Assertions.assertEquals(109L, ((DomainEventMessage) list.get(list.size() - OPTIMIZE_EVENT_CONSUMPTION)).getSequenceNumber());
    }

    @Test
    void streamEventsShouldNotReturnDuplicateTokens() throws InterruptedException {
        newTestSubject(0, FETCH_DELAY, FETCH_DELAY, true);
        Stream stream = (Stream) Mockito.mock(Stream.class);
        Iterator it = (Iterator) Mockito.mock(Iterator.class);
        Mockito.when(stream.iterator()).thenReturn(it);
        Mockito.when(this.storageEngine.readEvents((TrackingToken) Mockito.any(TrackingToken.class), Mockito.eq(false))).thenReturn(stream);
        Mockito.when(Boolean.valueOf(it.hasNext())).thenAnswer(new SynchronizedBooleanAnswer(false)).thenAnswer(new SynchronizedBooleanAnswer(true));
        Mockito.when((GenericTrackedEventMessage) it.next()).thenReturn(new GenericTrackedEventMessage(new GlobalSequenceTrackingToken(1L), EventStoreTestUtils.createEvent()));
        TrackingEventStream openStream = this.testSubject.openStream((TrackingToken) null);
        Assertions.assertFalse(openStream.hasNextAvailable());
        this.testSubject.publish(new EventMessage[]{EventStoreTestUtils.createEvent()});
        Thread.sleep(200L);
        Assertions.assertFalse(openStream.hasNextAvailable());
    }

    @Test
    void loadWithFailingSnapshot() {
        String uuid = UUID.randomUUID().toString();
        this.testSubject.publish(EventStoreTestUtils.createEvents(() -> {
            return uuid;
        }, 110));
        this.transactionManager.executeInTransaction(() -> {
            this.storageEngine.storeSnapshot(EventStoreTestUtils.createEvent(uuid, 30L));
        });
        Mockito.when(this.storageEngine.readSnapshot(uuid)).thenThrow(new Throwable[]{new MockException()});
        List list = (List) this.testSubject.readEvents(uuid).asStream().collect(Collectors.toList());
        Assertions.assertEquals(110, list.size());
        Assertions.assertEquals(0L, ((DomainEventMessage) list.get(0)).getSequenceNumber());
        Assertions.assertEquals(109L, ((DomainEventMessage) list.get(list.size() - OPTIMIZE_EVENT_CONSUMPTION)).getSequenceNumber());
    }

    @Test
    void loadEventsAfterPublishingInSameUnitOfWork() {
        String uuid = UUID.randomUUID().toString();
        List<DomainEventMessage<?>> createEvents = EventStoreTestUtils.createEvents(() -> {
            return uuid;
        }, CACHED_EVENTS);
        this.testSubject.publish(createEvents.subList(0, 2));
        LegacyDefaultUnitOfWork.startAndGet((Message) null).execute(() -> {
            Assertions.assertEquals(2L, this.testSubject.readEvents(uuid).asStream().count());
            this.testSubject.publish(createEvents.subList(2, createEvents.size()));
            Assertions.assertEquals(10L, this.testSubject.readEvents(uuid).asStream().count());
        });
    }

    @Test
    void loadEventsWithOffsetAfterPublishingInSameUnitOfWork() {
        String uuid = UUID.randomUUID().toString();
        List<DomainEventMessage<?>> createEvents = EventStoreTestUtils.createEvents(() -> {
            return uuid;
        }, CACHED_EVENTS);
        this.testSubject.publish(createEvents.subList(0, 2));
        LegacyDefaultUnitOfWork.startAndGet((Message) null).execute(() -> {
            Assertions.assertEquals(2L, this.testSubject.readEvents(uuid).asStream().count());
            this.testSubject.publish(createEvents.subList(2, createEvents.size()));
            Assertions.assertEquals(8L, this.testSubject.readEvents(uuid, 2L).asStream().count());
        });
    }

    @Test
    void eventsAppendedInvisibleUntilUnitOfWorkIsCommitted() {
        String uuid = UUID.randomUUID().toString();
        List<DomainEventMessage<?>> createEvents = EventStoreTestUtils.createEvents(() -> {
            return uuid;
        }, CACHED_EVENTS);
        this.testSubject.publish(createEvents.subList(0, 2));
        LegacyDefaultUnitOfWork startAndGet = LegacyDefaultUnitOfWork.startAndGet((Message) null);
        this.testSubject.publish(createEvents.subList(2, createEvents.size()));
        CurrentUnitOfWork.clear(startAndGet);
        Assertions.assertEquals(2L, this.testSubject.readEvents(uuid).asStream().count());
        CurrentUnitOfWork.set(startAndGet);
        Assertions.assertEquals(10L, this.testSubject.readEvents(uuid).asStream().count());
        startAndGet.rollback();
        Assertions.assertEquals(2L, this.testSubject.readEvents(uuid).asStream().count());
    }

    @Test
    void appendEventsCreatesCorrectSpans() {
        List<DomainEventMessage<?>> createEvents = EventStoreTestUtils.createEvents(CACHED_EVENTS);
        LegacyDefaultUnitOfWork.startAndGet((Message) null);
        this.testSubject.publish(createEvents);
        createEvents.forEach(domainEventMessage -> {
            this.spanFactory.verifySpanCompleted("EventBus.publishEvent", domainEventMessage);
            this.spanFactory.verifySpanPropagated("EventBus.publishEvent", domainEventMessage);
            this.spanFactory.verifySpanHasType("EventBus.publishEvent", TestSpanFactory.TestSpanType.DISPATCH);
        });
        this.spanFactory.verifyNotStarted("EventBus.commitEvents");
        CurrentUnitOfWork.commit();
        this.spanFactory.verifySpanCompleted("EventBus.commitEvents");
        this.spanFactory.verifySpanHasType("EventBus.commitEvents", TestSpanFactory.TestSpanType.INTERNAL);
    }

    @Test
    void stagedEventsNotDuplicatedAfterCommit() {
        String uuid = UUID.randomUUID().toString();
        List<DomainEventMessage<?>> createEvents = EventStoreTestUtils.createEvents(() -> {
            return uuid;
        }, CACHED_EVENTS);
        this.testSubject.publish(createEvents.subList(0, 2));
        LegacyDefaultUnitOfWork startAndGet = LegacyDefaultUnitOfWork.startAndGet((Message) null);
        this.testSubject.publish(createEvents.subList(2, 4));
        LegacyDefaultUnitOfWork startAndGet2 = LegacyDefaultUnitOfWork.startAndGet((Message) null);
        this.testSubject.publish(createEvents.subList(4, createEvents.size()));
        Consumer consumer = legacyUnitOfWork -> {
            Assertions.assertEquals(10L, this.testSubject.readEvents(uuid).asStream().count());
        };
        startAndGet2.onPrepareCommit(consumer);
        startAndGet2.afterCommit(consumer);
        startAndGet2.onCommit(consumer);
        startAndGet.onPrepareCommit(consumer);
        startAndGet.afterCommit(consumer);
        startAndGet.onCommit(consumer);
        startAndGet2.commit();
        startAndGet.commit();
    }

    @Timeout(5)
    @Test
    void customThreadFactoryIsUsed() throws Exception {
        CountDownLatch countDownLatch = new CountDownLatch(OPTIMIZE_EVENT_CONSUMPTION);
        TrackingEventStream openStream = this.testSubject.openStream((TrackingToken) null);
        Thread thread = new Thread(() -> {
            openStream.asStream().findFirst().ifPresent(trackedEventMessage -> {
                countDownLatch.countDown();
            });
        });
        thread.start();
        Assertions.assertFalse(countDownLatch.await(100L, TimeUnit.MILLISECONDS));
        this.testSubject.publish(new EventMessage[]{EventStoreTestUtils.createEvent()});
        thread.join();
        Assertions.assertFalse(thread.isAlive());
        Assertions.assertEquals(0L, countDownLatch.getCount());
        ((ThreadFactory) Mockito.verify(this.threadFactory, Mockito.atLeastOnce())).newThread((Runnable) Mockito.any(Runnable.class));
    }

    @Test
    void openStreamReadsEventsFromAnEventProducedByVerifyThreadFactoryOperation() throws InterruptedException {
        TrackingEventStream openStream = this.testSubject.openStream((TrackingToken) null);
        Assertions.assertFalse(openStream.hasNextAvailable());
        this.testSubject.publish(EventStoreTestUtils.createEvents(5));
        Thread.sleep(100L);
        Assertions.assertTrue(openStream.hasNextAvailable());
        while (openStream.hasNextAvailable()) {
            openStream.nextAvailable();
        }
        Assertions.assertFalse(openStream.hasNextAvailable());
        ((ThreadFactory) Mockito.verify(this.threadFactory, Mockito.atLeastOnce())).newThread((Runnable) Mockito.any(Runnable.class));
    }

    @Test
    void tailingConsumptionThreadIsNeverCreatedIfEventConsumptionOptimizationIsSwitchedOff() throws InterruptedException {
        newTestSubject(CACHED_EVENTS, FETCH_DELAY, CLEANUP_DELAY, false);
        TrackingEventStream openStream = this.testSubject.openStream((TrackingToken) null);
        this.testSubject.publish(EventStoreTestUtils.createEvents(5));
        while (openStream.hasNextAvailable()) {
            openStream.nextAvailable();
        }
        Mockito.verifyNoInteractions(new Object[]{this.threadFactory});
    }

    @Test
    void eventStreamKeepsReturningEventsIfEventConsumptionOptimizationIsSwitchedOff() throws InterruptedException {
        newTestSubject(CACHED_EVENTS, FETCH_DELAY, CLEANUP_DELAY, false);
        TrackingEventStream openStream = this.testSubject.openStream((TrackingToken) null);
        Assertions.assertFalse(openStream.hasNextAvailable());
        this.testSubject.publish(EventStoreTestUtils.createEvents(5));
        Assertions.assertTrue(openStream.hasNextAvailable());
        while (openStream.hasNextAvailable()) {
            openStream.nextAvailable();
        }
        Assertions.assertFalse(openStream.hasNextAvailable());
    }
}
