package org.axonframework.commandhandling.disruptor;

import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.EventHandlerGroup;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.axonframework.commandhandling.CommandBus;
import org.axonframework.commandhandling.CommandCallback;
import org.axonframework.commandhandling.CommandDispatchInterceptor;
import org.axonframework.commandhandling.CommandHandler;
import org.axonframework.commandhandling.CommandHandlerInterceptor;
import org.axonframework.commandhandling.CommandMessage;
import org.axonframework.commandhandling.CommandTargetResolver;
import org.axonframework.commandhandling.disruptor.CommandHandlerInvoker;
import org.axonframework.commandhandling.disruptor.CommandHandlingEntry;
import org.axonframework.commandhandling.interceptors.SerializationOptimizingInterceptor;
import org.axonframework.common.Assert;
import org.axonframework.common.AxonThreadFactory;
import org.axonframework.eventhandling.EventBus;
import org.axonframework.eventsourcing.AggregateFactory;
import org.axonframework.eventsourcing.EventSourcedAggregateRoot;
import org.axonframework.eventstore.EventStore;
import org.axonframework.repository.Repository;
import org.axonframework.serializer.Serializer;
import org.axonframework.unitofwork.TransactionManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/axonframework/commandhandling/disruptor/DisruptorCommandBus.class */
public class DisruptorCommandBus implements CommandBus {
    private static final Logger logger = LoggerFactory.getLogger(DisruptorCommandBus.class);
    private static final ThreadGroup DISRUPTOR_THREAD_GROUP = new ThreadGroup("DisruptorCommandBus");
    private final ConcurrentMap<String, CommandHandler<?>> commandHandlers;
    private final Disruptor<CommandHandlingEntry> disruptor;
    private final CommandHandlerInvoker[] commandHandlerInvokers;
    private final List<CommandDispatchInterceptor> dispatchInterceptors;
    private final List<CommandHandlerInterceptor> invokerInterceptors;
    private final List<CommandHandlerInterceptor> publisherInterceptors;
    private final ExecutorService executorService;
    private final boolean rescheduleOnCorruptState;
    private volatile boolean started;
    private volatile boolean disruptorShutDown;
    private final long coolingDownPeriod;
    private final CommandTargetResolver commandTargetResolver;
    private final int publisherCount;
    private final int serializerCount;

    /* loaded from: input_file:org/axonframework/commandhandling/disruptor/DisruptorCommandBus$DisruptorRepository.class */
    private static class DisruptorRepository<T extends EventSourcedAggregateRoot> implements Repository<T> {
        private final String typeIdentifier;

        public DisruptorRepository(String str) {
            this.typeIdentifier = str;
        }

        @Override // org.axonframework.repository.Repository
        public T load(Object obj, Long l) {
            return (T) CommandHandlerInvoker.getRepository(this.typeIdentifier).load(obj, l);
        }

        @Override // org.axonframework.repository.Repository
        public T load(Object obj) {
            return (T) CommandHandlerInvoker.getRepository(this.typeIdentifier).load(obj);
        }

        @Override // org.axonframework.repository.Repository
        public void add(T t) {
            CommandHandlerInvoker.getRepository(this.typeIdentifier).add((CommandHandlerInvoker.DisruptorRepository) t);
        }
    }

    /* loaded from: input_file:org/axonframework/commandhandling/disruptor/DisruptorCommandBus$ExceptionHandler.class */
    private class ExceptionHandler implements com.lmax.disruptor.ExceptionHandler {
        private ExceptionHandler() {
        }

        public void handleEventException(Throwable th, long j, Object obj) {
            DisruptorCommandBus.logger.error("Exception occurred while processing a {}.", ((CommandHandlingEntry) obj).getCommand().getPayloadType().getSimpleName(), th);
        }

        public void handleOnStartException(Throwable th) {
            DisruptorCommandBus.logger.error("Failed to start the DisruptorCommandBus.", th);
            DisruptorCommandBus.this.disruptor.shutdown();
        }

        public void handleOnShutdownException(Throwable th) {
            DisruptorCommandBus.logger.error("Error while shutting down the DisruptorCommandBus", th);
        }
    }

    public DisruptorCommandBus(EventStore eventStore, EventBus eventBus) {
        this(eventStore, eventBus, new DisruptorConfiguration());
    }

    public DisruptorCommandBus(EventStore eventStore, EventBus eventBus, DisruptorConfiguration disruptorConfiguration) {
        this.commandHandlers = new ConcurrentHashMap();
        this.started = true;
        this.disruptorShutDown = false;
        Executor executor = disruptorConfiguration.getExecutor();
        if (executor == null) {
            this.executorService = Executors.newCachedThreadPool(new AxonThreadFactory(DISRUPTOR_THREAD_GROUP));
            executor = this.executorService;
        } else {
            this.executorService = null;
        }
        this.rescheduleOnCorruptState = disruptorConfiguration.getRescheduleCommandsOnCorruptState();
        this.invokerInterceptors = new ArrayList(disruptorConfiguration.getInvokerInterceptors());
        this.publisherInterceptors = new ArrayList(disruptorConfiguration.getPublisherInterceptors());
        this.dispatchInterceptors = new ArrayList(disruptorConfiguration.getDispatchInterceptors());
        TransactionManager transactionManager = disruptorConfiguration.getTransactionManager();
        this.disruptor = new Disruptor<>(new CommandHandlingEntry.Factory(disruptorConfiguration.getTransactionManager() != null), executor, disruptorConfiguration.getClaimStrategy(), disruptorConfiguration.getWaitStrategy());
        this.commandTargetResolver = disruptorConfiguration.getCommandTargetResolver();
        this.commandHandlerInvokers = initializeInvokerThreads(eventStore, disruptorConfiguration);
        SerializerHandler[] initializeSerializerThreads = initializeSerializerThreads(disruptorConfiguration);
        this.serializerCount = initializeSerializerThreads.length;
        EventPublisher[] initializePublisherThreads = initializePublisherThreads(eventStore, eventBus, disruptorConfiguration, executor, transactionManager);
        this.publisherCount = initializePublisherThreads.length;
        this.disruptor.handleExceptionsWith(new ExceptionHandler());
        EventHandlerGroup handleEventsWith = this.disruptor.handleEventsWith(this.commandHandlerInvokers);
        if (initializeSerializerThreads.length > 0) {
            handleEventsWith = handleEventsWith.then(initializeSerializerThreads);
            this.invokerInterceptors.add(new SerializationOptimizingInterceptor());
        }
        handleEventsWith.then(initializePublisherThreads);
        this.coolingDownPeriod = disruptorConfiguration.getCoolingDownPeriod();
        this.disruptor.start();
    }

    private EventPublisher[] initializePublisherThreads(EventStore eventStore, EventBus eventBus, DisruptorConfiguration disruptorConfiguration, Executor executor, TransactionManager transactionManager) {
        EventPublisher[] eventPublisherArr = new EventPublisher[disruptorConfiguration.getPublisherThreadCount()];
        for (int i = 0; i < eventPublisherArr.length; i++) {
            eventPublisherArr[i] = new EventPublisher(eventStore, eventBus, executor, transactionManager, disruptorConfiguration.getRollbackConfiguration(), i);
        }
        return eventPublisherArr;
    }

    private SerializerHandler[] initializeSerializerThreads(DisruptorConfiguration disruptorConfiguration) {
        if (!disruptorConfiguration.isPreSerializationConfigured()) {
            return new SerializerHandler[0];
        }
        Serializer serializer = disruptorConfiguration.getSerializer();
        SerializerHandler[] serializerHandlerArr = new SerializerHandler[disruptorConfiguration.getSerializerThreadCount()];
        for (int i = 0; i < serializerHandlerArr.length; i++) {
            serializerHandlerArr[i] = new SerializerHandler(serializer, i, disruptorConfiguration.getSerializedRepresentation());
        }
        return serializerHandlerArr;
    }

    private CommandHandlerInvoker[] initializeInvokerThreads(EventStore eventStore, DisruptorConfiguration disruptorConfiguration) {
        CommandHandlerInvoker[] commandHandlerInvokerArr = new CommandHandlerInvoker[disruptorConfiguration.getInvokerThreadCount()];
        for (int i = 0; i < commandHandlerInvokerArr.length; i++) {
            commandHandlerInvokerArr[i] = new CommandHandlerInvoker(eventStore, disruptorConfiguration.getCache(), i);
        }
        return commandHandlerInvokerArr;
    }

    @Override // org.axonframework.commandhandling.CommandBus
    public void dispatch(CommandMessage<?> commandMessage) {
        dispatch(commandMessage, null);
    }

    @Override // org.axonframework.commandhandling.CommandBus
    public <R> void dispatch(CommandMessage<?> commandMessage, CommandCallback<R> commandCallback) {
        Assert.state(this.started, "CommandBus has been shut down. It is not accepting any Commands");
        CommandMessage<?> commandMessage2 = commandMessage;
        Iterator<CommandDispatchInterceptor> it = this.dispatchInterceptors.iterator();
        while (it.hasNext()) {
            commandMessage2 = it.next().handle(commandMessage2);
        }
        doDispatch(commandMessage2, commandCallback);
    }

    public <R> void doDispatch(CommandMessage commandMessage, CommandCallback<R> commandCallback) {
        Object identifier;
        Assert.state(!this.disruptorShutDown, "Disruptor has been shut down. Cannot dispatch or re-dispatch commands");
        RingBuffer ringBuffer = this.disruptor.getRingBuffer();
        int i = 0;
        int i2 = 0;
        int i3 = 0;
        if ((this.commandHandlerInvokers.length > 1 || this.publisherCount > 1 || this.serializerCount > 0) && (identifier = this.commandTargetResolver.resolveTarget(commandMessage).getIdentifier()) != null) {
            int hashCode = identifier.hashCode() & Integer.MAX_VALUE;
            if (this.commandHandlerInvokers.length > 1) {
                i = hashCode % this.commandHandlerInvokers.length;
            }
            if (this.serializerCount > 1) {
                i3 = hashCode % this.serializerCount;
            }
            if (this.publisherCount > 1) {
                i2 = hashCode % this.publisherCount;
            }
        }
        long next = ringBuffer.next();
        ((CommandHandlingEntry) ringBuffer.get(next)).reset(commandMessage, this.commandHandlers.get(commandMessage.getCommandName()), i, i2, i3, new BlacklistDetectingCallback(commandCallback, commandMessage, this.disruptor.getRingBuffer(), this, this.rescheduleOnCorruptState), this.invokerInterceptors, this.publisherInterceptors);
        ringBuffer.publish(next);
    }

    public <T extends EventSourcedAggregateRoot> Repository<T> createRepository(AggregateFactory<T> aggregateFactory) {
        for (CommandHandlerInvoker commandHandlerInvoker : this.commandHandlerInvokers) {
            commandHandlerInvoker.createRepository(aggregateFactory);
        }
        return new DisruptorRepository(aggregateFactory.getTypeIdentifier());
    }

    @Override // org.axonframework.commandhandling.CommandBus
    public <C> void subscribe(String str, CommandHandler<? super C> commandHandler) {
        this.commandHandlers.put(str, commandHandler);
    }

    @Override // org.axonframework.commandhandling.CommandBus
    public <C> boolean unsubscribe(String str, CommandHandler<? super C> commandHandler) {
        return this.commandHandlers.remove(str, commandHandler);
    }

    public void stop() {
        if (this.started) {
            this.started = false;
            long currentTimeMillis = System.currentTimeMillis();
            long cursor = this.disruptor.getRingBuffer().getCursor();
            while (System.currentTimeMillis() - currentTimeMillis < this.coolingDownPeriod && !Thread.interrupted()) {
                if (this.disruptor.getRingBuffer().getCursor() != cursor) {
                    currentTimeMillis = System.currentTimeMillis();
                    cursor = this.disruptor.getRingBuffer().getCursor();
                }
            }
            this.disruptorShutDown = true;
            this.disruptor.shutdown();
            if (this.executorService != null) {
                this.executorService.shutdown();
            }
        }
    }
}
