package org.axonframework.commandhandling.disruptor;

import com.lmax.disruptor.SleepingWaitStrategy;
import com.lmax.disruptor.dsl.ProducerType;
import java.lang.reflect.Executable;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import junit.framework.TestCase;
import org.axonframework.commandhandling.CommandCallback;
import org.axonframework.commandhandling.CommandMessage;
import org.axonframework.commandhandling.GenericCommandMessage;
import org.axonframework.commandhandling.NoHandlerForCommandException;
import org.axonframework.commandhandling.TargetAggregateIdentifier;
import org.axonframework.commandhandling.model.Aggregate;
import org.axonframework.commandhandling.model.AggregateIdentifier;
import org.axonframework.commandhandling.model.AggregateLifecycle;
import org.axonframework.commandhandling.model.Repository;
import org.axonframework.common.MockException;
import org.axonframework.common.Registration;
import org.axonframework.common.transaction.Transaction;
import org.axonframework.common.transaction.TransactionManager;
import org.axonframework.eventhandling.EventMessage;
import org.axonframework.eventsourcing.DomainEventMessage;
import org.axonframework.eventsourcing.EventSourcingHandler;
import org.axonframework.eventsourcing.GenericAggregateFactory;
import org.axonframework.eventsourcing.GenericDomainEventMessage;
import org.axonframework.eventsourcing.SnapshotTrigger;
import org.axonframework.eventsourcing.SnapshotTriggerDefinition;
import org.axonframework.eventsourcing.eventstore.DomainEventStream;
import org.axonframework.eventsourcing.eventstore.EventStore;
import org.axonframework.eventsourcing.eventstore.EventStoreTestUtils;
import org.axonframework.eventsourcing.eventstore.TrackingEventStream;
import org.axonframework.eventsourcing.eventstore.TrackingToken;
import org.axonframework.messaging.InterceptorChain;
import org.axonframework.messaging.Message;
import org.axonframework.messaging.MessageDispatchInterceptor;
import org.axonframework.messaging.MessageHandler;
import org.axonframework.messaging.MessageHandlerInterceptor;
import org.axonframework.messaging.annotation.ClasspathParameterResolverFactory;
import org.axonframework.messaging.annotation.ParameterResolverFactory;
import org.axonframework.messaging.unitofwork.RollbackConfigurationType;
import org.axonframework.messaging.unitofwork.UnitOfWork;
import org.axonframework.monitoring.MessageMonitor;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.mockito.InOrder;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;

/* loaded from: input_file:org/axonframework/commandhandling/disruptor/DisruptorCommandBusTest.class */
public class DisruptorCommandBusTest {
    private static final int COMMAND_COUNT = 100000;
    private StubHandler stubHandler;
    private InMemoryEventStore eventStore;
    private DisruptorCommandBus testSubject;
    private String aggregateIdentifier;
    private TransactionManager mockTransactionManager;
    private ParameterResolverFactory parameterResolverFactory;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/axonframework/commandhandling/disruptor/DisruptorCommandBusTest$CreateCommand.class */
    public static class CreateCommand extends StubCommand {
        public CreateCommand(Object obj) {
            super(obj);
        }
    }

    /* loaded from: input_file:org/axonframework/commandhandling/disruptor/DisruptorCommandBusTest$ErrorCommand.class */
    private static class ErrorCommand extends StubCommand {
        public ErrorCommand(Object obj) {
            super(obj);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/axonframework/commandhandling/disruptor/DisruptorCommandBusTest$ExceptionCommand.class */
    public static class ExceptionCommand extends StubCommand {
        private final Exception exception;

        public ExceptionCommand(Object obj, Exception exc) {
            super(obj);
            this.exception = exc;
        }

        public Exception getException() {
            return this.exception;
        }
    }

    /* loaded from: input_file:org/axonframework/commandhandling/disruptor/DisruptorCommandBusTest$FailingEvent.class */
    static class FailingEvent {
        FailingEvent() {
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/axonframework/commandhandling/disruptor/DisruptorCommandBusTest$InMemoryEventStore.class */
    public static class InMemoryEventStore implements EventStore {
        private final Map<String, DomainEventMessage> storedEvents;
        private final CountDownLatch countDownLatch;

        private InMemoryEventStore() {
            this.storedEvents = new ConcurrentHashMap();
            this.countDownLatch = new CountDownLatch(100001);
        }

        public DomainEventStream readEvents(String str) {
            DomainEventMessage domainEventMessage = this.storedEvents.get(str);
            return domainEventMessage == null ? DomainEventStream.empty() : DomainEventStream.of(domainEventMessage);
        }

        public void publish(List<? extends EventMessage<?>> list) {
            if (list == null || list.isEmpty()) {
                return;
            }
            String aggregateIdentifier = list.get(0).getAggregateIdentifier();
            DomainEventMessage domainEventMessage = null;
            for (EventMessage<?> eventMessage : list) {
                this.countDownLatch.countDown();
                domainEventMessage = (DomainEventMessage) eventMessage;
                if (FailingEvent.class.isAssignableFrom(domainEventMessage.getPayloadType())) {
                    throw new MockException("This is a failing event. EventStore refuses to store that");
                }
            }
            this.storedEvents.put(aggregateIdentifier, domainEventMessage);
        }

        public TrackingEventStream streamEvents(TrackingToken trackingToken) {
            throw new UnsupportedOperationException();
        }

        public Registration subscribe(Consumer<List<? extends EventMessage<?>>> consumer) {
            throw new UnsupportedOperationException();
        }

        public Registration registerDispatchInterceptor(MessageDispatchInterceptor<EventMessage<?>> messageDispatchInterceptor) {
            throw new UnsupportedOperationException();
        }

        public void storeSnapshot(DomainEventMessage<?> domainEventMessage) {
        }
    }

    /* loaded from: input_file:org/axonframework/commandhandling/disruptor/DisruptorCommandBusTest$Parameter.class */
    private static class Parameter implements Answer<Object> {
        private final int index;

        private Parameter(int i) {
            this.index = i;
        }

        public Object answer(InvocationOnMock invocationOnMock) throws Exception {
            return invocationOnMock.getArguments()[this.index];
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/axonframework/commandhandling/disruptor/DisruptorCommandBusTest$StubAggregate.class */
    public static class StubAggregate {
        private static final long serialVersionUID = 8192033940704210095L;

        @AggregateIdentifier
        private String identifier;

        private StubAggregate(String str) {
            this.identifier = str;
            AggregateLifecycle.apply(new SomethingDoneEvent());
        }

        public StubAggregate() {
        }

        public String getIdentifier() {
            return this.identifier;
        }

        public void doSomething() {
            AggregateLifecycle.apply(new SomethingDoneEvent());
        }

        public void createFailingEvent() {
            AggregateLifecycle.apply(new FailingEvent());
        }

        @EventSourcingHandler
        protected void handle(EventMessage eventMessage) {
            this.identifier = ((DomainEventMessage) eventMessage).getAggregateIdentifier();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/axonframework/commandhandling/disruptor/DisruptorCommandBusTest$StubCommand.class */
    public static class StubCommand {

        @TargetAggregateIdentifier
        private Object aggregateIdentifier;

        public StubCommand(Object obj) {
            this.aggregateIdentifier = obj;
        }

        public String getAggregateIdentifier() {
            return this.aggregateIdentifier.toString();
        }
    }

    /* loaded from: input_file:org/axonframework/commandhandling/disruptor/DisruptorCommandBusTest$StubDomainEvent.class */
    private static class StubDomainEvent {
        private StubDomainEvent() {
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/axonframework/commandhandling/disruptor/DisruptorCommandBusTest$StubHandler.class */
    public static class StubHandler implements MessageHandler<CommandMessage<?>> {
        private Repository<StubAggregate> repository;

        private StubHandler() {
        }

        public Object handle(CommandMessage<?> commandMessage) throws Exception {
            StubCommand stubCommand = (StubCommand) commandMessage.getPayload();
            if (ExceptionCommand.class.isAssignableFrom(commandMessage.getPayloadType())) {
                throw ((ExceptionCommand) commandMessage.getPayload()).getException();
            }
            if (CreateCommand.class.isAssignableFrom(commandMessage.getPayloadType())) {
                this.repository.newInstance(() -> {
                    return new StubAggregate(stubCommand.getAggregateIdentifier());
                }).execute((v0) -> {
                    v0.doSomething();
                });
                return null;
            }
            Aggregate load = this.repository.load(stubCommand.getAggregateIdentifier());
            if (ErrorCommand.class.isAssignableFrom(commandMessage.getPayloadType())) {
                load.execute((v0) -> {
                    v0.createFailingEvent();
                });
                return null;
            }
            load.execute((v0) -> {
                v0.doSomething();
            });
            return null;
        }

        public void setRepository(Repository<StubAggregate> repository) {
            this.repository = repository;
        }
    }

    @Before
    public void setUp() throws Exception {
        this.aggregateIdentifier = UUID.randomUUID().toString();
        this.stubHandler = new StubHandler();
        this.eventStore = new InMemoryEventStore();
        this.eventStore.publish(Collections.singletonList(new GenericDomainEventMessage(EventStoreTestUtils.TYPE, this.aggregateIdentifier, 0L, new StubDomainEvent())));
        this.parameterResolverFactory = (ParameterResolverFactory) Mockito.spy(ClasspathParameterResolverFactory.forClass(DisruptorCommandBusTest.class));
    }

    @After
    public void tearDown() {
        this.testSubject.stop();
    }

    @Test
    public void testCallbackInvokedBeforeUnitOfWorkCleanup() throws Exception {
        MessageHandlerInterceptor messageHandlerInterceptor = (MessageHandlerInterceptor) Mockito.mock(MessageHandlerInterceptor.class);
        MessageDispatchInterceptor messageDispatchInterceptor = (MessageDispatchInterceptor) Mockito.mock(MessageDispatchInterceptor.class);
        Mockito.when(messageDispatchInterceptor.handle((Message) Mockito.isA(CommandMessage.class))).thenAnswer(new Parameter(0));
        ExecutorService newCachedThreadPool = Executors.newCachedThreadPool();
        this.testSubject = new DisruptorCommandBus(this.eventStore, new DisruptorConfiguration().setInvokerInterceptors(Collections.singletonList(messageHandlerInterceptor)).setDispatchInterceptors(Collections.singletonList(messageDispatchInterceptor)).setBufferSize(8).setProducerType(ProducerType.SINGLE).setWaitStrategy(new SleepingWaitStrategy()).setExecutor(newCachedThreadPool).setInvokerThreadCount(2).setPublisherThreadCount(3));
        this.testSubject.subscribe(StubCommand.class.getName(), this.stubHandler);
        this.stubHandler.setRepository(this.testSubject.createRepository(new GenericAggregateFactory(StubAggregate.class), this.parameterResolverFactory));
        Consumer consumer = (Consumer) Mockito.mock(Consumer.class);
        Consumer consumer2 = (Consumer) Mockito.mock(Consumer.class);
        Consumer consumer3 = (Consumer) Mockito.mock(Consumer.class);
        Mockito.when(messageHandlerInterceptor.handle((UnitOfWork) Mockito.any(UnitOfWork.class), (InterceptorChain) Mockito.any(InterceptorChain.class))).thenAnswer(invocationOnMock -> {
            UnitOfWork unitOfWork = (UnitOfWork) invocationOnMock.getArguments()[0];
            unitOfWork.onPrepareCommit(consumer);
            unitOfWork.afterCommit(consumer2);
            unitOfWork.onCleanup(consumer3);
            return ((InterceptorChain) invocationOnMock.getArguments()[1]).proceed();
        });
        CommandMessage asCommandMessage = GenericCommandMessage.asCommandMessage(new StubCommand(this.aggregateIdentifier));
        CommandCallback commandCallback = (CommandCallback) Mockito.mock(CommandCallback.class);
        this.testSubject.dispatch(asCommandMessage, commandCallback);
        this.testSubject.stop();
        TestCase.assertFalse(newCachedThreadPool.awaitTermination(250L, TimeUnit.MILLISECONDS));
        newCachedThreadPool.shutdown();
        TestCase.assertTrue(newCachedThreadPool.awaitTermination(5L, TimeUnit.SECONDS));
        InOrder inOrder = Mockito.inOrder(new Object[]{messageDispatchInterceptor, messageHandlerInterceptor, consumer, consumer2, consumer3, commandCallback});
        ((MessageDispatchInterceptor) inOrder.verify(messageDispatchInterceptor)).handle((Message) Mockito.isA(CommandMessage.class));
        ((MessageHandlerInterceptor) inOrder.verify(messageHandlerInterceptor)).handle((UnitOfWork) Mockito.any(UnitOfWork.class), (InterceptorChain) Mockito.any(InterceptorChain.class));
        ((Consumer) inOrder.verify(consumer)).accept(Mockito.isA(UnitOfWork.class));
        ((Consumer) inOrder.verify(consumer2)).accept(Mockito.isA(UnitOfWork.class));
        ((Consumer) inOrder.verify(consumer3)).accept(Mockito.isA(UnitOfWork.class));
        ((CommandCallback) Mockito.verify(commandCallback)).onSuccess((CommandMessage) Mockito.eq(asCommandMessage), Mockito.any());
    }

    @Test
    public void testPublishUnsupportedCommand() throws Exception {
        ExecutorService newCachedThreadPool = Executors.newCachedThreadPool();
        this.testSubject = new DisruptorCommandBus(this.eventStore, new DisruptorConfiguration().setBufferSize(8).setProducerType(ProducerType.SINGLE).setWaitStrategy(new SleepingWaitStrategy()).setExecutor(newCachedThreadPool).setInvokerThreadCount(2).setPublisherThreadCount(3));
        try {
            this.testSubject.dispatch(GenericCommandMessage.asCommandMessage("Test"));
            TestCase.fail("Expected exception");
        } catch (NoHandlerForCommandException e) {
            TestCase.assertTrue(e.getMessage().contains(String.class.getSimpleName()));
        } finally {
            newCachedThreadPool.shutdownNow();
        }
    }

    @Test
    public void testEventStreamsDecoratedOnReadAndWrite() throws InterruptedException {
        ExecutorService newCachedThreadPool = Executors.newCachedThreadPool();
        this.testSubject = new DisruptorCommandBus(this.eventStore, new DisruptorConfiguration().setBufferSize(8).setProducerType(ProducerType.SINGLE).setWaitStrategy(new SleepingWaitStrategy()).setExecutor(newCachedThreadPool).setInvokerThreadCount(2).setPublisherThreadCount(3));
        this.testSubject.subscribe(StubCommand.class.getName(), this.stubHandler);
        SnapshotTriggerDefinition snapshotTriggerDefinition = (SnapshotTriggerDefinition) Mockito.mock(SnapshotTriggerDefinition.class);
        SnapshotTrigger snapshotTrigger = (SnapshotTrigger) Mockito.mock(SnapshotTrigger.class);
        Mockito.when(snapshotTriggerDefinition.prepareTrigger((Class) Mockito.any())).thenReturn(snapshotTrigger);
        this.stubHandler.setRepository(this.testSubject.createRepository(new GenericAggregateFactory(StubAggregate.class), snapshotTriggerDefinition));
        CommandMessage asCommandMessage = GenericCommandMessage.asCommandMessage(new StubCommand(this.aggregateIdentifier));
        this.testSubject.dispatch(asCommandMessage, (CommandCallback) Mockito.mock(CommandCallback.class));
        this.testSubject.dispatch(asCommandMessage);
        this.testSubject.stop();
        TestCase.assertFalse(newCachedThreadPool.awaitTermination(250L, TimeUnit.MILLISECONDS));
        newCachedThreadPool.shutdown();
        TestCase.assertTrue(newCachedThreadPool.awaitTermination(5L, TimeUnit.SECONDS));
        InOrder inOrder = Mockito.inOrder(new Object[]{snapshotTrigger});
        ((SnapshotTrigger) inOrder.verify(snapshotTrigger)).eventHandled((EventMessage) Mockito.isA(DomainEventMessage.class));
        ((SnapshotTrigger) inOrder.verify(snapshotTrigger)).initializationFinished();
        ((SnapshotTrigger) inOrder.verify(snapshotTrigger, Mockito.times(2))).eventHandled((EventMessage) Mockito.isA(DomainEventMessage.class));
    }

    @Test
    public void usesProvidedParameterResolverFactoryToResolveParameters() throws Exception {
        this.testSubject = new DisruptorCommandBus(this.eventStore);
        this.testSubject.createRepository(new GenericAggregateFactory(StubAggregate.class), this.parameterResolverFactory);
        ((ParameterResolverFactory) Mockito.verify(this.parameterResolverFactory, Mockito.atLeastOnce())).createInstance((Executable) Mockito.isA(Executable.class), (java.lang.reflect.Parameter[]) Mockito.isA(java.lang.reflect.Parameter[].class), Mockito.anyInt());
        Mockito.verifyNoMoreInteractions(new Object[]{this.parameterResolverFactory});
    }

    @Test
    public void testEventPublicationExecutedWithinTransaction() throws Exception {
        MessageHandlerInterceptor messageHandlerInterceptor = (MessageHandlerInterceptor) Mockito.mock(MessageHandlerInterceptor.class);
        ExecutorService newCachedThreadPool = Executors.newCachedThreadPool();
        Transaction transaction = (Transaction) Mockito.mock(Transaction.class);
        this.mockTransactionManager = (TransactionManager) Mockito.mock(TransactionManager.class);
        Mockito.when(this.mockTransactionManager.startTransaction()).thenReturn(transaction);
        dispatchCommands(messageHandlerInterceptor, newCachedThreadPool, GenericCommandMessage.asCommandMessage(new ErrorCommand(this.aggregateIdentifier)));
        TestCase.assertFalse(newCachedThreadPool.awaitTermination(250L, TimeUnit.MILLISECONDS));
        newCachedThreadPool.shutdown();
        TestCase.assertTrue(newCachedThreadPool.awaitTermination(5L, TimeUnit.SECONDS));
        ((TransactionManager) Mockito.verify(this.mockTransactionManager, Mockito.times(991))).startTransaction();
        ((Transaction) Mockito.verify(transaction, Mockito.times(991))).commit();
        Mockito.verifyNoMoreInteractions(new Object[]{transaction, this.mockTransactionManager});
    }

    @Test(timeout = 10000)
    public void testAggregatesBlacklistedAndRecoveredOnError_WithAutoReschedule() throws Exception {
        MessageHandlerInterceptor messageHandlerInterceptor = (MessageHandlerInterceptor) Mockito.mock(MessageHandlerInterceptor.class);
        ExecutorService newCachedThreadPool = Executors.newCachedThreadPool();
        CommandCallback dispatchCommands = dispatchCommands(messageHandlerInterceptor, newCachedThreadPool, GenericCommandMessage.asCommandMessage(new ErrorCommand(this.aggregateIdentifier)));
        TestCase.assertFalse(newCachedThreadPool.awaitTermination(250L, TimeUnit.MILLISECONDS));
        newCachedThreadPool.shutdown();
        TestCase.assertTrue(newCachedThreadPool.awaitTermination(5L, TimeUnit.SECONDS));
        ((CommandCallback) Mockito.verify(dispatchCommands, Mockito.times(990))).onSuccess((CommandMessage) Mockito.any(), Mockito.any());
        ((CommandCallback) Mockito.verify(dispatchCommands, Mockito.times(10))).onFailure((CommandMessage) Mockito.any(), (Throwable) Mockito.isA(RuntimeException.class));
    }

    @Test(timeout = 10000)
    public void testAggregatesBlacklistedAndRecoveredOnError_WithoutReschedule() throws Exception {
        MessageHandlerInterceptor messageHandlerInterceptor = (MessageHandlerInterceptor) Mockito.mock(MessageHandlerInterceptor.class);
        ExecutorService newCachedThreadPool = Executors.newCachedThreadPool();
        CommandCallback dispatchCommands = dispatchCommands(messageHandlerInterceptor, newCachedThreadPool, GenericCommandMessage.asCommandMessage(new ErrorCommand(this.aggregateIdentifier)));
        TestCase.assertFalse(newCachedThreadPool.awaitTermination(250L, TimeUnit.MILLISECONDS));
        newCachedThreadPool.shutdown();
        TestCase.assertTrue(newCachedThreadPool.awaitTermination(5L, TimeUnit.SECONDS));
        ((CommandCallback) Mockito.verify(dispatchCommands, Mockito.times(990))).onSuccess((CommandMessage) Mockito.any(), Mockito.any());
        ((CommandCallback) Mockito.verify(dispatchCommands, Mockito.times(10))).onFailure((CommandMessage) Mockito.any(), (Throwable) Mockito.isA(RuntimeException.class));
    }

    private CommandCallback dispatchCommands(MessageHandlerInterceptor messageHandlerInterceptor, ExecutorService executorService, CommandMessage<ErrorCommand> commandMessage) throws Exception {
        this.eventStore.storedEvents.clear();
        this.testSubject = new DisruptorCommandBus(this.eventStore, new DisruptorConfiguration().setInvokerInterceptors(Arrays.asList(messageHandlerInterceptor)).setBufferSize(8).setProducerType(ProducerType.MULTI).setWaitStrategy(new SleepingWaitStrategy()).setExecutor(executorService).setRollbackConfiguration(RollbackConfigurationType.ANY_THROWABLE).setInvokerThreadCount(2).setPublisherThreadCount(3).setTransactionManager(this.mockTransactionManager));
        this.testSubject.subscribe(StubCommand.class.getName(), this.stubHandler);
        this.testSubject.subscribe(CreateCommand.class.getName(), this.stubHandler);
        this.testSubject.subscribe(ErrorCommand.class.getName(), this.stubHandler);
        this.stubHandler.setRepository(this.testSubject.createRepository(new GenericAggregateFactory(StubAggregate.class)));
        Mockito.when(messageHandlerInterceptor.handle((UnitOfWork) Mockito.any(UnitOfWork.class), (InterceptorChain) Mockito.any(InterceptorChain.class))).thenAnswer(invocationOnMock -> {
            return ((InterceptorChain) invocationOnMock.getArguments()[1]).proceed();
        });
        this.testSubject.dispatch(GenericCommandMessage.asCommandMessage(new CreateCommand(this.aggregateIdentifier)));
        CommandCallback commandCallback = (CommandCallback) Mockito.mock(CommandCallback.class);
        for (int i = 0; i < 1000; i++) {
            this.testSubject.dispatch(i % 100 == 10 ? commandMessage : GenericCommandMessage.asCommandMessage(new StubCommand(this.aggregateIdentifier)), commandCallback);
        }
        this.testSubject.stop();
        return commandCallback;
    }

    @Test
    public void testCreateAggregate() {
        this.eventStore.storedEvents.clear();
        this.testSubject = new DisruptorCommandBus(this.eventStore, new DisruptorConfiguration().setBufferSize(8).setProducerType(ProducerType.SINGLE).setWaitStrategy(new SleepingWaitStrategy()).setInvokerThreadCount(2).setPublisherThreadCount(3));
        this.testSubject.subscribe(StubCommand.class.getName(), this.stubHandler);
        this.testSubject.subscribe(CreateCommand.class.getName(), this.stubHandler);
        this.testSubject.subscribe(ErrorCommand.class.getName(), this.stubHandler);
        this.stubHandler.setRepository(this.testSubject.createRepository(new GenericAggregateFactory(StubAggregate.class)));
        this.testSubject.dispatch(GenericCommandMessage.asCommandMessage(new CreateCommand(this.aggregateIdentifier)));
        this.testSubject.stop();
        DomainEventMessage domainEventMessage = (DomainEventMessage) this.eventStore.storedEvents.get(this.aggregateIdentifier);
        TestCase.assertEquals(1L, domainEventMessage.getSequenceNumber());
        TestCase.assertEquals(this.aggregateIdentifier, domainEventMessage.getAggregateIdentifier());
    }

    @Test
    public void testMessageMonitoring() throws InterruptedException {
        this.eventStore.storedEvents.clear();
        AtomicLong atomicLong = new AtomicLong();
        AtomicLong atomicLong2 = new AtomicLong();
        AtomicLong atomicLong3 = new AtomicLong();
        this.testSubject = new DisruptorCommandBus(this.eventStore, new DisruptorConfiguration().setBufferSize(8).setMessageMonitor(commandMessage -> {
            return new MessageMonitor.MonitorCallback() { // from class: org.axonframework.commandhandling.disruptor.DisruptorCommandBusTest.1
                public void reportSuccess() {
                    atomicLong.incrementAndGet();
                }

                public void reportFailure(Throwable th) {
                    atomicLong2.incrementAndGet();
                }

                public void reportIgnored() {
                    atomicLong3.incrementAndGet();
                }
            };
        }));
        this.testSubject.subscribe(StubCommand.class.getName(), this.stubHandler);
        this.testSubject.subscribe(CreateCommand.class.getName(), this.stubHandler);
        this.testSubject.subscribe(ErrorCommand.class.getName(), this.stubHandler);
        this.stubHandler.setRepository(this.testSubject.createRepository(new GenericAggregateFactory(StubAggregate.class)));
        String uuid = UUID.randomUUID().toString();
        this.testSubject.dispatch(GenericCommandMessage.asCommandMessage(new CreateCommand(this.aggregateIdentifier)));
        this.testSubject.dispatch(GenericCommandMessage.asCommandMessage(new StubCommand(this.aggregateIdentifier)));
        this.testSubject.dispatch(GenericCommandMessage.asCommandMessage(new ErrorCommand(this.aggregateIdentifier)));
        this.testSubject.dispatch(GenericCommandMessage.asCommandMessage(new StubCommand(this.aggregateIdentifier)));
        this.testSubject.dispatch(GenericCommandMessage.asCommandMessage(new StubCommand(this.aggregateIdentifier)));
        this.testSubject.dispatch(GenericCommandMessage.asCommandMessage(new CreateCommand(uuid)));
        this.testSubject.dispatch(GenericCommandMessage.asCommandMessage(new StubCommand(uuid)));
        this.testSubject.dispatch(GenericCommandMessage.asCommandMessage(new ErrorCommand(uuid)));
        this.testSubject.dispatch(GenericCommandMessage.asCommandMessage(new StubCommand(uuid)));
        this.testSubject.dispatch(GenericCommandMessage.asCommandMessage(new StubCommand(uuid)));
        this.testSubject.stop();
        TestCase.assertEquals(8L, atomicLong.get());
        TestCase.assertEquals(2L, atomicLong2.get());
        TestCase.assertEquals(0L, atomicLong3.get());
    }

    @Test(expected = IllegalStateException.class)
    public void testCommandRejectedAfterShutdown() throws InterruptedException {
        this.testSubject = new DisruptorCommandBus(this.eventStore);
        this.testSubject.subscribe(StubCommand.class.getName(), this.stubHandler);
        this.stubHandler.setRepository(this.testSubject.createRepository(new GenericAggregateFactory(StubAggregate.class)));
        this.testSubject.stop();
        this.testSubject.dispatch(GenericCommandMessage.asCommandMessage(new Object()));
    }

    @Test(timeout = 10000)
    public void testCommandProcessedAndEventsStored() throws InterruptedException {
        this.testSubject = new DisruptorCommandBus(this.eventStore);
        this.testSubject.subscribe(StubCommand.class.getName(), this.stubHandler);
        this.stubHandler.setRepository(this.testSubject.createRepository(new GenericAggregateFactory(StubAggregate.class)));
        for (int i = 0; i < COMMAND_COUNT; i++) {
            this.testSubject.dispatch(GenericCommandMessage.asCommandMessage(new StubCommand(this.aggregateIdentifier)));
        }
        this.eventStore.countDownLatch.await(5L, TimeUnit.SECONDS);
        TestCase.assertEquals("Seems that some events are not stored", 0L, this.eventStore.countDownLatch.getCount());
    }
}
