package org.axonframework.eventhandling;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import junit.framework.TestCase;
import org.axonframework.common.MockException;
import org.axonframework.common.transaction.NoTransactionManager;
import org.axonframework.eventhandling.tokenstore.TokenStore;
import org.axonframework.eventhandling.tokenstore.inmemory.InMemoryTokenStore;
import org.axonframework.eventsourcing.DomainEventMessage;
import org.axonframework.eventsourcing.eventstore.EmbeddedEventStore;
import org.axonframework.eventsourcing.eventstore.EventStoreTestUtils;
import org.axonframework.eventsourcing.eventstore.EventUtils;
import org.axonframework.eventsourcing.eventstore.GlobalSequenceTrackingToken;
import org.axonframework.eventsourcing.eventstore.TrackingEventStream;
import org.axonframework.eventsourcing.eventstore.TrackingToken;
import org.axonframework.eventsourcing.eventstore.inmemory.InMemoryEventStorageEngine;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Matchers;
import org.mockito.Mockito;
import org.springframework.test.annotation.DirtiesContext;

/* loaded from: input_file:org/axonframework/eventhandling/TrackingEventProcessorTest.class */
public class TrackingEventProcessorTest {
    private TrackingEventProcessor testSubject;
    private EmbeddedEventStore eventBus;
    private TokenStore tokenStore;
    private EventHandlerInvoker eventHandlerInvoker;
    private EventListener mockListener;

    @Before
    public void setUp() throws Exception {
        this.tokenStore = (TokenStore) Mockito.spy(new InMemoryTokenStore());
        this.mockListener = (EventListener) Mockito.mock(EventListener.class);
        this.eventHandlerInvoker = new SimpleEventHandlerInvoker(new Object[]{this.mockListener});
        this.eventBus = new EmbeddedEventStore(new InMemoryEventStorageEngine());
        this.testSubject = new TrackingEventProcessor("test", this.eventHandlerInvoker, this.eventBus, this.tokenStore, NoTransactionManager.INSTANCE);
        this.testSubject.start();
    }

    @After
    public void tearDown() throws Exception {
        this.testSubject.shutDown();
        this.eventBus.shutDown();
    }

    @Test
    public void testPublishedEventsGetPassedToListener() throws Exception {
        CountDownLatch countDownLatch = new CountDownLatch(2);
        ((EventListener) Mockito.doAnswer(invocationOnMock -> {
            countDownLatch.countDown();
            return null;
        }).when(this.mockListener)).handle((EventMessage) Matchers.any());
        this.eventBus.publish(EventStoreTestUtils.createEvents(2));
        TestCase.assertTrue("Expected listener to have received 2 published events", countDownLatch.await(5L, TimeUnit.SECONDS));
    }

    @Test
    public void testTokenIsStoredWhenEventIsRead() throws Exception {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        this.testSubject.registerInterceptor((unitOfWork, interceptorChain) -> {
            unitOfWork.onCleanup(unitOfWork -> {
                countDownLatch.countDown();
            });
            return interceptorChain.proceed();
        });
        this.eventBus.publish(new EventMessage[]{EventStoreTestUtils.createEvent()});
        TestCase.assertTrue("Expected Unit of Work to have reached clean up phase", countDownLatch.await(5L, TimeUnit.SECONDS));
        ((TokenStore) Mockito.verify(this.tokenStore)).storeToken((TrackingToken) Matchers.any(), (String) Matchers.any(), Mockito.anyInt());
        TestCase.assertNotNull(this.tokenStore.fetchToken(this.testSubject.getName(), 0));
    }

    @Test
    public void testTokenIsNotStoredWhenUnitOfWorkIsRolledBack() throws Exception {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        this.testSubject.registerInterceptor((unitOfWork, interceptorChain) -> {
            unitOfWork.onCommit(unitOfWork -> {
                throw new MockException();
            });
            return interceptorChain.proceed();
        });
        this.testSubject.registerInterceptor((unitOfWork2, interceptorChain2) -> {
            unitOfWork2.onCleanup(unitOfWork2 -> {
                countDownLatch.countDown();
            });
            return interceptorChain2.proceed();
        });
        this.eventBus.publish(new EventMessage[]{EventStoreTestUtils.createEvent()});
        TestCase.assertTrue("Expected Unit of Work to have reached clean up phase", countDownLatch.await(5L, TimeUnit.SECONDS));
        TestCase.assertNull(this.tokenStore.fetchToken(this.testSubject.getName(), 0));
    }

    @Test
    @DirtiesContext
    public void testContinueFromPreviousToken() throws Exception {
        this.testSubject.shutDown();
        this.tokenStore = new InMemoryTokenStore();
        this.eventBus.publish(EventStoreTestUtils.createEvents(10));
        TrackedEventMessage nextAvailable = this.eventBus.streamEvents((TrackingToken) null).nextAvailable();
        this.tokenStore.storeToken(nextAvailable.trackingToken(), this.testSubject.getName(), 0);
        TestCase.assertEquals(nextAvailable.trackingToken(), this.tokenStore.fetchToken(this.testSubject.getName(), 0));
        ArrayList arrayList = new ArrayList();
        CountDownLatch countDownLatch = new CountDownLatch(9);
        ((EventListener) Mockito.doAnswer(invocationOnMock -> {
            arrayList.add((EventMessage) invocationOnMock.getArguments()[0]);
            countDownLatch.countDown();
            return null;
        }).when(this.mockListener)).handle((EventMessage) Matchers.any());
        this.testSubject = new TrackingEventProcessor("test", this.eventHandlerInvoker, this.eventBus, this.tokenStore, NoTransactionManager.INSTANCE);
        this.testSubject.start();
        TestCase.assertTrue("Expected 9 invocations on event listener by now", countDownLatch.await(5L, TimeUnit.SECONDS));
        TestCase.assertEquals(9, arrayList.size());
    }

    @Test
    public void testFirstTokenIsStoredWhenUnitOfWorkIsRolledBackOnSecondEvent() throws Exception {
        List<DomainEventMessage<?>> createEvents = EventStoreTestUtils.createEvents(2);
        CountDownLatch countDownLatch = new CountDownLatch(2);
        this.testSubject.registerInterceptor((unitOfWork, interceptorChain) -> {
            unitOfWork.onCommit(unitOfWork -> {
                if (unitOfWork.getMessage().equals(createEvents.get(1))) {
                    throw new MockException();
                }
            });
            return interceptorChain.proceed();
        });
        this.testSubject.registerInterceptor((unitOfWork2, interceptorChain2) -> {
            unitOfWork2.onCleanup(unitOfWork2 -> {
                countDownLatch.countDown();
            });
            return interceptorChain2.proceed();
        });
        this.eventBus.publish(createEvents);
        TestCase.assertTrue("Expected Unit of Work to have reached clean up phase", countDownLatch.await(5L, TimeUnit.SECONDS));
        ((TokenStore) Mockito.verify(this.tokenStore, Mockito.atLeastOnce())).storeToken((TrackingToken) Matchers.any(), (String) Matchers.any(), Mockito.anyInt());
        TestCase.assertNotNull(this.tokenStore.fetchToken(this.testSubject.getName(), 0));
    }

    @Test
    @DirtiesContext
    public void testEventsWithTheSameTokenAreProcessedInTheSameBatch() throws Exception {
        this.testSubject.shutDown();
        this.eventBus.shutDown();
        this.eventBus = (EmbeddedEventStore) Mockito.mock(EmbeddedEventStore.class);
        GlobalSequenceTrackingToken globalSequenceTrackingToken = new GlobalSequenceTrackingToken(0L);
        List list = (List) EventStoreTestUtils.createEvents(2).stream().map(domainEventMessage -> {
            return EventUtils.asTrackedEventMessage(domainEventMessage, globalSequenceTrackingToken);
        }).collect(Collectors.toList());
        Mockito.when(this.eventBus.streamEvents((TrackingToken) null)).thenReturn(trackingEventStreamOf(list.iterator()));
        this.testSubject = new TrackingEventProcessor("test", this.eventHandlerInvoker, this.eventBus, this.tokenStore, NoTransactionManager.INSTANCE);
        this.testSubject.registerInterceptor((unitOfWork, interceptorChain) -> {
            unitOfWork.onCommit(unitOfWork -> {
                if (unitOfWork.getMessage().equals(list.get(1))) {
                    throw new MockException();
                }
            });
            return interceptorChain.proceed();
        });
        CountDownLatch countDownLatch = new CountDownLatch(2);
        this.testSubject.registerInterceptor((unitOfWork2, interceptorChain2) -> {
            unitOfWork2.onCleanup(unitOfWork2 -> {
                countDownLatch.countDown();
            });
            return interceptorChain2.proceed();
        });
        this.testSubject.start();
        TestCase.assertTrue("Expected Unit of Work to have reached clean up phase", countDownLatch.await(5L, TimeUnit.SECONDS));
        ((TokenStore) Mockito.verify(this.tokenStore, Mockito.atLeastOnce())).storeToken((TrackingToken) Matchers.any(), (String) Matchers.any(), Mockito.anyInt());
        TestCase.assertNull(this.tokenStore.fetchToken(this.testSubject.getName(), 0));
    }

    private static TrackingEventStream trackingEventStreamOf(final Iterator<TrackedEventMessage<?>> it) {
        return new TrackingEventStream() { // from class: org.axonframework.eventhandling.TrackingEventProcessorTest.1
            private boolean hasPeeked;
            private TrackedEventMessage<?> peekEvent;

            public Optional<TrackedEventMessage<?>> peek() {
                if (!this.hasPeeked) {
                    if (!hasNextAvailable()) {
                        return Optional.empty();
                    }
                    this.peekEvent = (TrackedEventMessage) it.next();
                    this.hasPeeked = true;
                }
                return Optional.of(this.peekEvent);
            }

            public boolean hasNextAvailable(int i, TimeUnit timeUnit) {
                return this.hasPeeked || it.hasNext();
            }

            public TrackedEventMessage nextAvailable() {
                if (!this.hasPeeked) {
                    return (TrackedEventMessage) it.next();
                }
                TrackedEventMessage<?> trackedEventMessage = this.peekEvent;
                this.peekEvent = null;
                this.hasPeeked = false;
                return trackedEventMessage;
            }

            public void close() {
            }
        };
    }
}
