package org.axonframework.eventhandling.async;

import jakarta.annotation.Nonnull;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import java.util.function.Consumer;
import org.axonframework.eventhandling.EventMessage;
import org.axonframework.eventhandling.EventProcessingStrategy;
import org.axonframework.eventhandling.async.EventProcessorTask;
import org.axonframework.messaging.unitofwork.CurrentUnitOfWork;
import org.axonframework.messaging.unitofwork.UnitOfWork;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/axonframework/eventhandling/async/AsynchronousEventProcessingStrategy.class */
public class AsynchronousEventProcessingStrategy implements EventProcessingStrategy {
    private static final Logger logger = LoggerFactory.getLogger(AsynchronousEventProcessingStrategy.class);
    private final Executor executor;
    private final SequencingPolicy<? super EventMessage<?>> sequencingPolicy;
    private final String scheduledEventsKey = this + "_SCHEDULED_EVENTS";
    private final ConcurrentMap<Object, EventProcessorTask> currentTasks = new ConcurrentHashMap();

    /* loaded from: input_file:org/axonframework/eventhandling/async/AsynchronousEventProcessingStrategy$NoActionCallback.class */
    private enum NoActionCallback implements EventProcessorTask.ShutdownCallback {
        INSTANCE;

        @Override // org.axonframework.eventhandling.async.EventProcessorTask.ShutdownCallback
        public void afterShutdown(EventProcessorTask eventProcessorTask) {
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/axonframework/eventhandling/async/AsynchronousEventProcessingStrategy$SchedulerCleanUp.class */
    public final class SchedulerCleanUp implements EventProcessorTask.ShutdownCallback {
        private final Object sequenceIdentifier;

        private SchedulerCleanUp(Object obj) {
            this.sequenceIdentifier = obj;
        }

        @Override // org.axonframework.eventhandling.async.EventProcessorTask.ShutdownCallback
        public void afterShutdown(EventProcessorTask eventProcessorTask) {
            if (AsynchronousEventProcessingStrategy.logger.isDebugEnabled()) {
                AsynchronousEventProcessingStrategy.logger.debug("Cleaning up processing scheduler for sequence [{}]", this.sequenceIdentifier.toString());
            }
            AsynchronousEventProcessingStrategy.this.currentTasks.remove(this.sequenceIdentifier, eventProcessorTask);
        }
    }

    public AsynchronousEventProcessingStrategy(Executor executor, SequencingPolicy<? super EventMessage<?>> sequencingPolicy) {
        this.executor = (Executor) Objects.requireNonNull(executor);
        this.sequencingPolicy = (SequencingPolicy) Objects.requireNonNull(sequencingPolicy);
    }

    @Override // org.axonframework.eventhandling.EventProcessingStrategy
    public void handle(@Nonnull List<? extends EventMessage<?>> list, @Nonnull Consumer<List<? extends EventMessage<?>>> consumer) {
        if (!CurrentUnitOfWork.isStarted()) {
            schedule(list, consumer);
        } else {
            UnitOfWork<?> root = CurrentUnitOfWork.get().root();
            ((List) root.getOrComputeResource(this.scheduledEventsKey, str -> {
                ArrayList arrayList = new ArrayList();
                root.afterCommit(unitOfWork -> {
                    schedule(arrayList, consumer);
                });
                return arrayList;
            })).addAll(list);
        }
    }

    protected void schedule(List<? extends EventMessage<?>> list, Consumer<List<? extends EventMessage<?>>> consumer) {
        HashMap hashMap = new HashMap();
        for (EventMessage<?> eventMessage : list) {
            ((List) hashMap.computeIfAbsent(this.sequencingPolicy.getSequenceIdentifierFor(eventMessage), obj -> {
                return new ArrayList();
            })).add(eventMessage);
        }
        hashMap.forEach((obj2, list2) -> {
            if (obj2 == null) {
                if (logger.isDebugEnabled()) {
                    logger.debug("Scheduling Event for full concurrent processing {}", list.getClass().getSimpleName());
                }
                newProcessingScheduler(NoActionCallback.INSTANCE).scheduleEvents(list2, consumer);
            } else {
                if (logger.isDebugEnabled()) {
                    logger.debug("Scheduling task of type [{}] for sequential processing in group [{}]", list.getClass().getSimpleName(), obj2.toString());
                }
                assignEventsToScheduler(list2, obj2, consumer);
            }
        });
    }

    private void assignEventsToScheduler(List<? extends EventMessage<?>> list, Object obj, Consumer<List<? extends EventMessage<?>>> consumer) {
        boolean z = false;
        while (!z) {
            EventProcessorTask eventProcessorTask = this.currentTasks.get(obj);
            if (eventProcessorTask == null) {
                this.currentTasks.putIfAbsent(obj, newProcessingScheduler(new SchedulerCleanUp(obj)));
            } else {
                z = eventProcessorTask.scheduleEvents(list, consumer);
                if (!z) {
                    this.currentTasks.remove(obj, eventProcessorTask);
                }
            }
        }
    }

    protected EventProcessorTask newProcessingScheduler(EventProcessorTask.ShutdownCallback shutdownCallback) {
        if (logger.isDebugEnabled()) {
            logger.debug("Initializing new processing scheduler.");
        }
        return new EventProcessorTask(this.executor, shutdownCallback);
    }
}
