package org.axonframework.commandhandling.disruptor;

import com.lmax.disruptor.SleepingWaitStrategy;
import com.lmax.disruptor.dsl.ProducerType;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import org.axonframework.commandhandling.CommandCallback;
import org.axonframework.commandhandling.CommandMessage;
import org.axonframework.commandhandling.GenericCommandMessage;
import org.axonframework.commandhandling.TargetAggregateIdentifier;
import org.axonframework.commandhandling.model.Aggregate;
import org.axonframework.commandhandling.model.AggregateIdentifier;
import org.axonframework.commandhandling.model.AggregateLifecycle;
import org.axonframework.commandhandling.model.Repository;
import org.axonframework.common.IdentifierFactory;
import org.axonframework.common.MockException;
import org.axonframework.common.Registration;
import org.axonframework.eventhandling.AbstractEventBus;
import org.axonframework.eventhandling.EventBus;
import org.axonframework.eventhandling.EventMessage;
import org.axonframework.eventsourcing.DomainEventMessage;
import org.axonframework.eventsourcing.EventSourcingHandler;
import org.axonframework.eventsourcing.GenericAggregateFactory;
import org.axonframework.eventsourcing.eventstore.DomainEventStream;
import org.axonframework.eventsourcing.eventstore.EventStore;
import org.axonframework.eventsourcing.eventstore.TrackingEventStream;
import org.axonframework.eventsourcing.eventstore.TrackingToken;
import org.axonframework.messaging.MessageDispatchInterceptor;
import org.axonframework.messaging.MessageHandler;
import org.axonframework.messaging.unitofwork.RollbackConfigurationType;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;

/* loaded from: input_file:org/axonframework/commandhandling/disruptor/DisruptorCommandBusTest_MultiThreaded.class */
public class DisruptorCommandBusTest_MultiThreaded {
    private static final int COMMAND_COUNT = 100;
    private static final int AGGREGATE_COUNT = 10;
    private StubHandler stubHandler;
    private InMemoryEventStore inMemoryEventStore;
    private DisruptorCommandBus testSubject;
    private String[] aggregateIdentifier;

    /* loaded from: input_file:org/axonframework/commandhandling/disruptor/DisruptorCommandBusTest_MultiThreaded$CountingEventBus.class */
    private static class CountingEventBus implements EventBus {
        private final CountDownLatch publisherCountDown = new CountDownLatch(DisruptorCommandBusTest_MultiThreaded.COMMAND_COUNT);

        private CountingEventBus() {
        }

        public void publish(List<? extends EventMessage<?>> list) {
            this.publisherCountDown.countDown();
        }

        public TrackingEventStream streamEvents(TrackingToken trackingToken) {
            throw new UnsupportedOperationException();
        }

        public Registration subscribe(Consumer<List<? extends EventMessage<?>>> consumer) {
            throw new UnsupportedOperationException();
        }

        public Registration registerDispatchInterceptor(MessageDispatchInterceptor<EventMessage<?>> messageDispatchInterceptor) {
            throw new UnsupportedOperationException();
        }
    }

    /* loaded from: input_file:org/axonframework/commandhandling/disruptor/DisruptorCommandBusTest_MultiThreaded$CreateCommand.class */
    private static class CreateCommand extends StubCommand {
        private CreateCommand(Object obj) {
            super(obj);
        }
    }

    /* loaded from: input_file:org/axonframework/commandhandling/disruptor/DisruptorCommandBusTest_MultiThreaded$ErrorCommand.class */
    private static class ErrorCommand extends StubCommand {
        private ErrorCommand(Object obj) {
            super(obj);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/axonframework/commandhandling/disruptor/DisruptorCommandBusTest_MultiThreaded$ExceptionCommand.class */
    public static class ExceptionCommand extends StubCommand {
        private final Exception exception;

        public ExceptionCommand(Object obj, Exception exc) {
            super(obj);
            this.exception = exc;
        }

        public Exception getException() {
            return this.exception;
        }
    }

    /* loaded from: input_file:org/axonframework/commandhandling/disruptor/DisruptorCommandBusTest_MultiThreaded$FailingEvent.class */
    private static class FailingEvent {
        private FailingEvent() {
        }
    }

    /* loaded from: input_file:org/axonframework/commandhandling/disruptor/DisruptorCommandBusTest_MultiThreaded$InMemoryEventStore.class */
    private static class InMemoryEventStore extends AbstractEventBus implements EventStore {
        private final Map<String, DomainEventMessage> storedEvents;
        private final AtomicInteger storedEventCounter;
        private final AtomicInteger loadCounter;

        private InMemoryEventStore() {
            this.storedEvents = new ConcurrentHashMap();
            this.storedEventCounter = new AtomicInteger();
            this.loadCounter = new AtomicInteger();
        }

        protected void commit(List<? extends EventMessage<?>> list) {
            if (list == null || list.isEmpty()) {
                return;
            }
            String aggregateIdentifier = list.get(0).getAggregateIdentifier();
            DomainEventMessage domainEventMessage = null;
            for (EventMessage<?> eventMessage : list) {
                this.storedEventCounter.incrementAndGet();
                domainEventMessage = (DomainEventMessage) eventMessage;
                if (FailingEvent.class.isAssignableFrom(domainEventMessage.getPayloadType())) {
                    throw new MockException("This is a failing event. EventStore refuses to store that");
                }
            }
            this.storedEvents.put(aggregateIdentifier, domainEventMessage);
        }

        public DomainEventStream readEvents(String str) {
            this.loadCounter.incrementAndGet();
            DomainEventMessage domainEventMessage = this.storedEvents.get(str);
            return domainEventMessage == null ? DomainEventStream.of(new DomainEventMessage[0]) : DomainEventStream.of(domainEventMessage);
        }

        public void storeSnapshot(DomainEventMessage<?> domainEventMessage) {
        }

        public TrackingEventStream streamEvents(TrackingToken trackingToken) {
            throw new UnsupportedOperationException();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/axonframework/commandhandling/disruptor/DisruptorCommandBusTest_MultiThreaded$StubAggregate.class */
    public static class StubAggregate {

        @AggregateIdentifier
        private String identifier;

        private StubAggregate(String str) {
            this.identifier = str;
            AggregateLifecycle.apply(new SomethingDoneEvent());
        }

        public StubAggregate() {
        }

        public String getIdentifier() {
            return this.identifier;
        }

        public void doSomething() {
            AggregateLifecycle.apply(new SomethingDoneEvent());
        }

        public void createFailingEvent() {
            AggregateLifecycle.apply(new FailingEvent());
        }

        @EventSourcingHandler
        protected void handle(EventMessage eventMessage) {
            this.identifier = ((DomainEventMessage) eventMessage).getAggregateIdentifier();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/axonframework/commandhandling/disruptor/DisruptorCommandBusTest_MultiThreaded$StubCommand.class */
    public static class StubCommand {

        @TargetAggregateIdentifier
        private Object aggregateIdentifier;

        private StubCommand(Object obj) {
            this.aggregateIdentifier = obj;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public Object getAggregateIdentifier() {
            return this.aggregateIdentifier;
        }
    }

    /* loaded from: input_file:org/axonframework/commandhandling/disruptor/DisruptorCommandBusTest_MultiThreaded$StubHandler.class */
    private static class StubHandler implements MessageHandler<CommandMessage<?>> {
        private Repository<StubAggregate> repository;

        private StubHandler() {
        }

        public Object handle(CommandMessage<?> commandMessage) throws Exception {
            StubCommand stubCommand = (StubCommand) commandMessage.getPayload();
            if (ExceptionCommand.class.isAssignableFrom(commandMessage.getPayloadType())) {
                throw ((ExceptionCommand) commandMessage.getPayload()).getException();
            }
            if (CreateCommand.class.isAssignableFrom(commandMessage.getPayloadType())) {
                this.repository.newInstance(() -> {
                    return new StubAggregate(stubCommand.getAggregateIdentifier().toString());
                }).execute((v0) -> {
                    v0.doSomething();
                });
            } else {
                Aggregate load = this.repository.load(stubCommand.getAggregateIdentifier().toString());
                if (ErrorCommand.class.isAssignableFrom(commandMessage.getPayloadType())) {
                    load.execute((v0) -> {
                        v0.createFailingEvent();
                    });
                } else {
                    load.execute((v0) -> {
                        v0.doSomething();
                    });
                }
            }
            return Void.TYPE;
        }

        public void setRepository(Repository<StubAggregate> repository) {
            this.repository = repository;
        }
    }

    @Before
    public void setUp() throws Exception {
        this.aggregateIdentifier = new String[AGGREGATE_COUNT];
        for (int i = 0; i < AGGREGATE_COUNT; i++) {
            this.aggregateIdentifier[i] = IdentifierFactory.getInstance().generateIdentifier();
        }
        this.stubHandler = new StubHandler();
        this.inMemoryEventStore = new InMemoryEventStore();
    }

    @After
    public void tearDown() {
        this.testSubject.stop();
    }

    @Test
    public void testDispatchLargeNumberCommandForDifferentAggregates() throws Exception {
        this.testSubject = new DisruptorCommandBus(this.inMemoryEventStore, new DisruptorConfiguration().setBufferSize(4).setProducerType(ProducerType.MULTI).setWaitStrategy(new SleepingWaitStrategy()).setRollbackConfiguration(RollbackConfigurationType.ANY_THROWABLE).setInvokerThreadCount(2).setPublisherThreadCount(3));
        this.testSubject.subscribe(StubCommand.class.getName(), this.stubHandler);
        this.testSubject.subscribe(CreateCommand.class.getName(), this.stubHandler);
        this.testSubject.subscribe(ErrorCommand.class.getName(), this.stubHandler);
        Repository<StubAggregate> repository = (Repository) Mockito.spy(this.testSubject.createRepository(new GenericAggregateFactory(StubAggregate.class)));
        this.stubHandler.setRepository(repository);
        ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap();
        ((Repository) Mockito.doAnswer(invocationOnMock -> {
            Aggregate aggregate = (Aggregate) invocationOnMock.callRealMethod();
            concurrentHashMap.put(aggregate, new Object());
            return aggregate;
        }).when(repository)).newInstance((Callable) Mockito.any());
        ((Repository) Mockito.doAnswer(invocationOnMock2 -> {
            Object callRealMethod = invocationOnMock2.callRealMethod();
            concurrentHashMap.put(callRealMethod, new Object());
            return callRealMethod;
        }).when(repository)).load((String) Mockito.isA(String.class));
        for (int i = 0; i < AGGREGATE_COUNT; i++) {
            this.testSubject.dispatch(GenericCommandMessage.asCommandMessage(new CreateCommand(this.aggregateIdentifier[i])));
        }
        CommandCallback commandCallback = (CommandCallback) Mockito.mock(CommandCallback.class);
        int i2 = 0;
        while (i2 < COMMAND_COUNT) {
            for (int i3 = 0; i3 < AGGREGATE_COUNT; i3++) {
                this.testSubject.dispatch(i2 == AGGREGATE_COUNT ? GenericCommandMessage.asCommandMessage(new ErrorCommand(this.aggregateIdentifier[i3])) : GenericCommandMessage.asCommandMessage(new StubCommand(this.aggregateIdentifier[i3])), commandCallback);
            }
            i2++;
        }
        this.testSubject.stop();
        Assert.assertEquals(10L, this.inMemoryEventStore.loadCounter.get());
        Assert.assertEquals(20L, concurrentHashMap.size());
        Assert.assertEquals(1020L, this.inMemoryEventStore.storedEventCounter.get());
        ((CommandCallback) Mockito.verify(commandCallback, Mockito.times(990))).onSuccess((CommandMessage) Mockito.any(), Mockito.any());
        ((CommandCallback) Mockito.verify(commandCallback, Mockito.times(AGGREGATE_COUNT))).onFailure((CommandMessage) Mockito.any(), (Throwable) Mockito.isA(RuntimeException.class));
    }
}
