package org.axonframework.eventhandling.async;

import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import org.axonframework.common.Assert;
import org.axonframework.domain.EventMessage;
import org.axonframework.eventhandling.AbstractCluster;
import org.axonframework.eventhandling.EventListener;
import org.axonframework.eventhandling.EventListenerOrderComparator;
import org.axonframework.eventhandling.MultiplexingEventProcessingMonitor;
import org.axonframework.eventhandling.OrderResolver;
import org.axonframework.eventhandling.async.EventProcessor;
import org.axonframework.unitofwork.DefaultUnitOfWorkFactory;
import org.axonframework.unitofwork.TransactionManager;
import org.axonframework.unitofwork.UnitOfWorkFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/axonframework/eventhandling/async/AsynchronousCluster.class */
public class AsynchronousCluster extends AbstractCluster {
    private static final Logger logger = LoggerFactory.getLogger(AsynchronousCluster.class);
    private final Executor executor;
    private final ErrorHandler errorHandler;
    private final ConcurrentMap<Object, EventProcessor> currentSchedulers;
    private final SequencingPolicy<? super EventMessage<?>> sequencingPolicy;
    private UnitOfWorkFactory unitOfWorkFactory;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/axonframework/eventhandling/async/AsynchronousCluster$NoActionCallback.class */
    public static class NoActionCallback implements EventProcessor.ShutdownCallback {
        private NoActionCallback() {
        }

        @Override // org.axonframework.eventhandling.async.EventProcessor.ShutdownCallback
        public void afterShutdown(EventProcessor eventProcessor) {
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/axonframework/eventhandling/async/AsynchronousCluster$SchedulerCleanUp.class */
    public final class SchedulerCleanUp implements EventProcessor.ShutdownCallback {
        private final Object sequenceIdentifier;

        private SchedulerCleanUp(Object obj) {
            this.sequenceIdentifier = obj;
        }

        @Override // org.axonframework.eventhandling.async.EventProcessor.ShutdownCallback
        public void afterShutdown(EventProcessor eventProcessor) {
            AsynchronousCluster.logger.debug("Cleaning up processing scheduler for sequence [{}]", this.sequenceIdentifier.toString());
            AsynchronousCluster.this.currentSchedulers.remove(this.sequenceIdentifier, eventProcessor);
        }
    }

    public AsynchronousCluster(String str, Executor executor, TransactionManager transactionManager, SequencingPolicy<? super EventMessage<?>> sequencingPolicy) {
        this(str, executor, transactionManager, sequencingPolicy, new DefaultErrorHandler(RetryPolicy.retryAfter(2, TimeUnit.SECONDS)));
    }

    public AsynchronousCluster(String str, Executor executor, SequencingPolicy<? super EventMessage<?>> sequencingPolicy) {
        this(str, executor, new DefaultUnitOfWorkFactory(), sequencingPolicy, new DefaultErrorHandler(RetryPolicy.proceed()));
    }

    public AsynchronousCluster(String str, Executor executor, TransactionManager transactionManager, SequencingPolicy<? super EventMessage<?>> sequencingPolicy, ErrorHandler errorHandler) {
        this(str, executor, new DefaultUnitOfWorkFactory(transactionManager), sequencingPolicy, errorHandler);
    }

    public AsynchronousCluster(String str, Executor executor, UnitOfWorkFactory unitOfWorkFactory, SequencingPolicy<? super EventMessage<?>> sequencingPolicy, ErrorHandler errorHandler) {
        super(str);
        this.currentSchedulers = new ConcurrentHashMap();
        Assert.notNull(errorHandler, "errorHandler may not be null");
        Assert.notNull(unitOfWorkFactory, "unitOfWorkFactory may not be null");
        Assert.notNull(sequencingPolicy, "sequencingPolicy may not be null");
        this.errorHandler = errorHandler;
        this.executor = executor;
        this.unitOfWorkFactory = unitOfWorkFactory;
        this.sequencingPolicy = sequencingPolicy;
    }

    public AsynchronousCluster(String str, Executor executor, UnitOfWorkFactory unitOfWorkFactory, SequencingPolicy<? super EventMessage<?>> sequencingPolicy, ErrorHandler errorHandler, OrderResolver orderResolver) {
        super(str, new EventListenerOrderComparator(orderResolver));
        this.currentSchedulers = new ConcurrentHashMap();
        Assert.notNull(errorHandler, "errorHandler may not be null");
        Assert.notNull(unitOfWorkFactory, "unitOfWorkFactory may not be null");
        Assert.notNull(sequencingPolicy, "sequencingPolicy may not be null");
        this.errorHandler = errorHandler;
        this.executor = executor;
        this.unitOfWorkFactory = unitOfWorkFactory;
        this.sequencingPolicy = sequencingPolicy;
    }

    @Override // org.axonframework.eventhandling.AbstractCluster
    protected void doPublish(List<EventMessage> list, Set<EventListener> set, MultiplexingEventProcessingMonitor multiplexingEventProcessingMonitor) {
        Iterator<EventMessage> it = list.iterator();
        while (it.hasNext()) {
            schedule(it.next(), multiplexingEventProcessingMonitor);
        }
    }

    protected void schedule(EventMessage<?> eventMessage, MultiplexingEventProcessingMonitor multiplexingEventProcessingMonitor) {
        Object sequenceIdentifierFor = this.sequencingPolicy.getSequenceIdentifierFor(eventMessage);
        if (sequenceIdentifierFor == null) {
            logger.debug("Scheduling Event for full concurrent processing {}", eventMessage.getClass().getSimpleName());
            newProcessingScheduler(new NoActionCallback(), getMembers(), multiplexingEventProcessingMonitor).scheduleEvent(eventMessage);
        } else {
            logger.debug("Scheduling task of type [{}] for sequential processing in group [{}]", eventMessage.getClass().getSimpleName(), sequenceIdentifierFor.toString());
            assignEventToScheduler(eventMessage, sequenceIdentifierFor, multiplexingEventProcessingMonitor);
        }
    }

    private void assignEventToScheduler(EventMessage<?> eventMessage, Object obj, MultiplexingEventProcessingMonitor multiplexingEventProcessingMonitor) {
        boolean z = false;
        while (!z) {
            EventProcessor eventProcessor = this.currentSchedulers.get(obj);
            if (eventProcessor == null) {
                this.currentSchedulers.putIfAbsent(obj, newProcessingScheduler(new SchedulerCleanUp(obj), getMembers(), multiplexingEventProcessingMonitor));
            } else {
                z = eventProcessor.scheduleEvent(eventMessage);
                if (!z) {
                    this.currentSchedulers.remove(obj, eventProcessor);
                }
            }
        }
    }

    protected EventProcessor newProcessingScheduler(EventProcessor.ShutdownCallback shutdownCallback, Set<EventListener> set, MultiplexingEventProcessingMonitor multiplexingEventProcessingMonitor) {
        logger.debug("Initializing new processing scheduler.");
        return new EventProcessor(this.executor, shutdownCallback, this.errorHandler, this.unitOfWorkFactory, set, multiplexingEventProcessingMonitor);
    }
}
