package org.axonframework.commandhandling.disruptor;

import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.axonframework.commandhandling.CommandBus;
import org.axonframework.commandhandling.CommandCallback;
import org.axonframework.commandhandling.CommandMessage;
import org.axonframework.commandhandling.CommandTargetResolver;
import org.axonframework.commandhandling.MonitorAwareCallback;
import org.axonframework.commandhandling.NoHandlerForCommandException;
import org.axonframework.commandhandling.model.Aggregate;
import org.axonframework.commandhandling.model.Repository;
import org.axonframework.common.Assert;
import org.axonframework.common.AxonThreadFactory;
import org.axonframework.common.Priority;
import org.axonframework.common.Registration;
import org.axonframework.common.transaction.TransactionManager;
import org.axonframework.eventsourcing.AggregateFactory;
import org.axonframework.eventsourcing.DomainEventMessage;
import org.axonframework.eventsourcing.EventStreamDecorator;
import org.axonframework.eventsourcing.eventstore.DomainEventStream;
import org.axonframework.eventsourcing.eventstore.EventStore;
import org.axonframework.messaging.MessageDispatchInterceptor;
import org.axonframework.messaging.MessageHandler;
import org.axonframework.messaging.MessageHandlerInterceptor;
import org.axonframework.monitoring.MessageMonitor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/axonframework/commandhandling/disruptor/DisruptorCommandBus.class */
public class DisruptorCommandBus implements CommandBus {
    private static final Logger logger = LoggerFactory.getLogger(DisruptorCommandBus.class);
    private static final ThreadGroup DISRUPTOR_THREAD_GROUP = new ThreadGroup("DisruptorCommandBus");
    private final ConcurrentMap<String, MessageHandler<? super CommandMessage<?>>> commandHandlers;
    private final Disruptor<CommandHandlingEntry> disruptor;
    private final CommandHandlerInvoker[] commandHandlerInvokers;
    private final List<MessageDispatchInterceptor<? super CommandMessage<?>>> dispatchInterceptors;
    private final List<MessageHandlerInterceptor<? super CommandMessage<?>>> invokerInterceptors;
    private final List<MessageHandlerInterceptor<? super CommandMessage<?>>> publisherInterceptors;
    private final ExecutorService executorService;
    private final boolean rescheduleOnCorruptState;
    private final long coolingDownPeriod;
    private final CommandTargetResolver commandTargetResolver;
    private final int publisherCount;
    private volatile boolean started;
    private volatile boolean disruptorShutDown;
    private final MessageMonitor<? super CommandMessage<?>> messageMonitor;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/axonframework/commandhandling/disruptor/DisruptorCommandBus$DisruptorRepository.class */
    public static class DisruptorRepository<T> implements Repository<T> {
        private final Class<T> type;

        public DisruptorRepository(Class<T> cls) {
            this.type = cls;
        }

        @Override // org.axonframework.commandhandling.model.Repository
        public Aggregate<T> load(String str, Long l) {
            return CommandHandlerInvoker.getRepository(this.type).load(str, l);
        }

        @Override // org.axonframework.commandhandling.model.Repository
        public Aggregate<T> load(String str) {
            return CommandHandlerInvoker.getRepository(this.type).load(str);
        }

        @Override // org.axonframework.commandhandling.model.Repository
        public Aggregate<T> newInstance(Callable<T> callable) throws Exception {
            return CommandHandlerInvoker.getRepository(this.type).newInstance(callable);
        }
    }

    /* loaded from: input_file:org/axonframework/commandhandling/disruptor/DisruptorCommandBus$ExceptionHandler.class */
    private class ExceptionHandler implements com.lmax.disruptor.ExceptionHandler {
        private ExceptionHandler() {
        }

        public void handleEventException(Throwable th, long j, Object obj) {
            DisruptorCommandBus.logger.error("Exception occurred while processing a {}.", ((CommandHandlingEntry) obj).getMessage().getPayloadType().getSimpleName(), th);
        }

        public void handleOnStartException(Throwable th) {
            DisruptorCommandBus.logger.error("Failed to start the DisruptorCommandBus.", th);
            DisruptorCommandBus.this.disruptor.shutdown();
        }

        public void handleOnShutdownException(Throwable th) {
            DisruptorCommandBus.logger.error("Error while shutting down the DisruptorCommandBus", th);
        }
    }

    /* loaded from: input_file:org/axonframework/commandhandling/disruptor/DisruptorCommandBus$FailureLoggingCommandCallback.class */
    private static class FailureLoggingCommandCallback implements CommandCallback<Object, Object> {
        private static final FailureLoggingCommandCallback INSTANCE = new FailureLoggingCommandCallback();

        private FailureLoggingCommandCallback() {
        }

        @Override // org.axonframework.commandhandling.CommandCallback
        public void onSuccess(CommandMessage<? extends Object> commandMessage, Object obj) {
        }

        @Override // org.axonframework.commandhandling.CommandCallback
        public void onFailure(CommandMessage<? extends Object> commandMessage, Throwable th) {
            DisruptorCommandBus.logger.info("An error occurred while handling a command [{}].", commandMessage.getCommandName(), th);
        }
    }

    /* loaded from: input_file:org/axonframework/commandhandling/disruptor/DisruptorCommandBus$NoOpEventStreamDecorator.class */
    private static class NoOpEventStreamDecorator implements EventStreamDecorator {
        public static final EventStreamDecorator INSTANCE = new NoOpEventStreamDecorator();

        private NoOpEventStreamDecorator() {
        }

        @Override // org.axonframework.eventsourcing.EventStreamDecorator
        public DomainEventStream decorateForRead(String str, DomainEventStream domainEventStream) {
            return domainEventStream;
        }

        @Override // org.axonframework.eventsourcing.EventStreamDecorator
        public List<DomainEventMessage<?>> decorateForAppend(Aggregate<?> aggregate, List<DomainEventMessage<?>> list) {
            return list;
        }
    }

    public DisruptorCommandBus(EventStore eventStore) {
        this(eventStore, new DisruptorConfiguration());
    }

    public DisruptorCommandBus(EventStore eventStore, DisruptorConfiguration disruptorConfiguration) {
        this.commandHandlers = new ConcurrentHashMap();
        this.started = true;
        this.disruptorShutDown = false;
        Assert.notNull(eventStore, "eventStore may not be null");
        Assert.notNull(disruptorConfiguration, "configuration may not be null");
        Executor executor = disruptorConfiguration.getExecutor();
        if (executor == null) {
            this.executorService = Executors.newCachedThreadPool(new AxonThreadFactory(DISRUPTOR_THREAD_GROUP));
            executor = this.executorService;
        } else {
            this.executorService = null;
        }
        this.rescheduleOnCorruptState = disruptorConfiguration.getRescheduleCommandsOnCorruptState();
        this.invokerInterceptors = new ArrayList(disruptorConfiguration.getInvokerInterceptors());
        this.publisherInterceptors = new ArrayList(disruptorConfiguration.getPublisherInterceptors());
        this.dispatchInterceptors = new ArrayList(disruptorConfiguration.getDispatchInterceptors());
        TransactionManager transactionManager = disruptorConfiguration.getTransactionManager();
        this.disruptor = new Disruptor<>(CommandHandlingEntry::new, disruptorConfiguration.getBufferSize(), executor, disruptorConfiguration.getProducerType(), disruptorConfiguration.getWaitStrategy());
        this.commandTargetResolver = disruptorConfiguration.getCommandTargetResolver();
        this.commandHandlerInvokers = initializeInvokerThreads(eventStore, disruptorConfiguration);
        EventPublisher[] initializePublisherThreads = initializePublisherThreads(disruptorConfiguration, executor, transactionManager);
        this.messageMonitor = disruptorConfiguration.getMessageMonitor();
        this.publisherCount = initializePublisherThreads.length;
        this.disruptor.setDefaultExceptionHandler(new ExceptionHandler());
        this.disruptor.handleEventsWith(this.commandHandlerInvokers).then(initializePublisherThreads);
        this.coolingDownPeriod = disruptorConfiguration.getCoolingDownPeriod();
        this.disruptor.start();
    }

    private EventPublisher[] initializePublisherThreads(DisruptorConfiguration disruptorConfiguration, Executor executor, TransactionManager transactionManager) {
        EventPublisher[] eventPublisherArr = new EventPublisher[disruptorConfiguration.getPublisherThreadCount()];
        for (int i = 0; i < eventPublisherArr.length; i++) {
            eventPublisherArr[i] = new EventPublisher(executor, transactionManager, disruptorConfiguration.getRollbackConfiguration(), i);
        }
        return eventPublisherArr;
    }

    private CommandHandlerInvoker[] initializeInvokerThreads(EventStore eventStore, DisruptorConfiguration disruptorConfiguration) {
        CommandHandlerInvoker[] commandHandlerInvokerArr = new CommandHandlerInvoker[disruptorConfiguration.getInvokerThreadCount()];
        for (int i = 0; i < commandHandlerInvokerArr.length; i++) {
            commandHandlerInvokerArr[i] = new CommandHandlerInvoker(eventStore, disruptorConfiguration.getCache(), i);
        }
        return commandHandlerInvokerArr;
    }

    @Override // org.axonframework.commandhandling.CommandBus
    public <C> void dispatch(CommandMessage<C> commandMessage) {
        dispatch(commandMessage, FailureLoggingCommandCallback.INSTANCE);
    }

    @Override // org.axonframework.commandhandling.CommandBus
    public <C, R> void dispatch(CommandMessage<C> commandMessage, CommandCallback<? super C, R> commandCallback) {
        Assert.state(this.started, "CommandBus has been shut down. It is not accepting any Commands");
        CommandMessage<C> commandMessage2 = commandMessage;
        Iterator<MessageDispatchInterceptor<? super CommandMessage<?>>> it = this.dispatchInterceptors.iterator();
        while (it.hasNext()) {
            commandMessage2 = (CommandMessage) it.next().handle((MessageDispatchInterceptor<? super CommandMessage<?>>) commandMessage2);
        }
        doDispatch(commandMessage2, new MonitorAwareCallback(commandCallback, this.messageMonitor.onMessageIngested(commandMessage)));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public <C, R> void doDispatch(CommandMessage<C> commandMessage, CommandCallback<? super C, R> commandCallback) {
        String identifier;
        Assert.state(!this.disruptorShutDown, "Disruptor has been shut down. Cannot dispatch or re-dispatch commands");
        MessageHandler<? super CommandMessage<?>> messageHandler = this.commandHandlers.get(commandMessage.getCommandName());
        if (messageHandler == null) {
            throw new NoHandlerForCommandException(String.format("No handler was subscribed to command [%s]", commandMessage.getCommandName()));
        }
        RingBuffer ringBuffer = this.disruptor.getRingBuffer();
        int i = 0;
        int i2 = 0;
        if ((this.commandHandlerInvokers.length > 1 || this.publisherCount > 1) && (identifier = this.commandTargetResolver.resolveTarget(commandMessage).getIdentifier()) != null) {
            int hashCode = identifier.hashCode() & Priority.FIRST;
            if (this.commandHandlerInvokers.length > 1) {
                i = hashCode % this.commandHandlerInvokers.length;
            }
            if (this.publisherCount > 1) {
                i2 = hashCode % this.publisherCount;
            }
        }
        long next = ringBuffer.next();
        try {
            ((CommandHandlingEntry) ringBuffer.get(next)).reset(commandMessage, messageHandler, i, i2, new BlacklistDetectingCallback(commandCallback, this.disruptor.getRingBuffer(), this, this.rescheduleOnCorruptState), this.invokerInterceptors, this.publisherInterceptors);
            ringBuffer.publish(next);
        } catch (Throwable th) {
            ringBuffer.publish(next);
            throw th;
        }
    }

    public <T> Repository<T> createRepository(AggregateFactory<T> aggregateFactory) {
        return createRepository(aggregateFactory, NoOpEventStreamDecorator.INSTANCE);
    }

    public <T> Repository<T> createRepository(AggregateFactory<T> aggregateFactory, EventStreamDecorator eventStreamDecorator) {
        for (CommandHandlerInvoker commandHandlerInvoker : this.commandHandlerInvokers) {
            commandHandlerInvoker.createRepository(aggregateFactory, eventStreamDecorator);
        }
        return new DisruptorRepository(aggregateFactory.getAggregateType());
    }

    @Override // org.axonframework.commandhandling.CommandBus
    public Registration subscribe(String str, MessageHandler<? super CommandMessage<?>> messageHandler) {
        this.commandHandlers.put(str, messageHandler);
        return () -> {
            return this.commandHandlers.remove(str, messageHandler);
        };
    }

    public void stop() {
        if (this.started) {
            this.started = false;
            long currentTimeMillis = System.currentTimeMillis();
            long cursor = this.disruptor.getRingBuffer().getCursor();
            while (System.currentTimeMillis() - currentTimeMillis < this.coolingDownPeriod && !Thread.interrupted()) {
                if (this.disruptor.getRingBuffer().getCursor() != cursor) {
                    currentTimeMillis = System.currentTimeMillis();
                    cursor = this.disruptor.getRingBuffer().getCursor();
                }
            }
            this.disruptorShutDown = true;
            this.disruptor.shutdown();
            if (this.executorService != null) {
                this.executorService.shutdown();
            }
        }
    }
}
