package org.axonframework.eventhandling;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import org.axonframework.common.Assert;
import org.axonframework.common.Registration;
import org.axonframework.messaging.MessageDispatchInterceptor;
import org.axonframework.messaging.unitofwork.CurrentUnitOfWork;
import org.axonframework.messaging.unitofwork.UnitOfWork;
import org.axonframework.monitoring.MessageMonitor;
import org.axonframework.monitoring.NoOpMessageMonitor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/axonframework/eventhandling/AbstractEventBus.class */
public abstract class AbstractEventBus implements EventBus {
    private static final Logger logger = LoggerFactory.getLogger(AbstractEventBus.class);
    final String eventsKey;
    private final MessageMonitor<? super EventMessage<?>> messageMonitor;
    private final Set<Consumer<List<? extends EventMessage<?>>>> eventProcessors;
    private final Set<MessageDispatchInterceptor<EventMessage<?>>> dispatchInterceptors;

    public AbstractEventBus() {
        this(NoOpMessageMonitor.INSTANCE);
    }

    public AbstractEventBus(MessageMonitor<? super EventMessage<?>> messageMonitor) {
        this.eventsKey = this + "_EVENTS";
        this.eventProcessors = new CopyOnWriteArraySet();
        this.dispatchInterceptors = new CopyOnWriteArraySet();
        this.messageMonitor = messageMonitor;
    }

    @Override // org.axonframework.eventhandling.EventBus
    public Registration subscribe(Consumer<List<? extends EventMessage<?>>> consumer) {
        if (!this.eventProcessors.add(consumer)) {
            logger.info("EventProcessor [{}] not added. It was already subscribed", consumer);
        } else if (logger.isDebugEnabled()) {
            logger.debug("EventProcessor [{}] subscribed successfully", consumer);
        }
        return () -> {
            if (!this.eventProcessors.remove(consumer)) {
                logger.info("EventListener {} not removed. It was already unsubscribed", consumer);
                return false;
            }
            if (!logger.isDebugEnabled()) {
                return true;
            }
            logger.debug("EventListener {} unsubscribed successfully", consumer);
            return true;
        };
    }

    @Override // org.axonframework.eventhandling.EventBus
    public Registration registerDispatchInterceptor(MessageDispatchInterceptor<EventMessage<?>> messageDispatchInterceptor) {
        this.dispatchInterceptors.add(messageDispatchInterceptor);
        return () -> {
            return this.dispatchInterceptors.remove(messageDispatchInterceptor);
        };
    }

    @Override // org.axonframework.eventhandling.EventBus
    public void publish(List<? extends EventMessage<?>> list) {
        if (!CurrentUnitOfWork.isStarted()) {
            prepareCommit(intercept(list));
            commit(list);
            afterCommit(list);
        } else {
            UnitOfWork<?> unitOfWork = CurrentUnitOfWork.get();
            Assert.state(!unitOfWork.phase().isAfter(UnitOfWork.Phase.PREPARE_COMMIT), "It is not allowed to publish events when the current Unit of Work has already been committed. Please start a new Unit of Work before publishing events.");
            Assert.state(!unitOfWork.root().phase().isAfter(UnitOfWork.Phase.PREPARE_COMMIT), "It is not allowed to publish events when the root Unit of Work has already been committed.");
            ((List) unitOfWork.getOrComputeResource(this.eventsKey, str -> {
                ArrayList arrayList = new ArrayList();
                unitOfWork.onPrepareCommit(unitOfWork2 -> {
                    if (!unitOfWork2.parent().isPresent() || unitOfWork2.root().phase().isAfter(UnitOfWork.Phase.PREPARE_COMMIT)) {
                        doWithEvents(this::prepareCommit, intercept(arrayList));
                    } else {
                        unitOfWork2.root().onPrepareCommit(unitOfWork2 -> {
                            doWithEvents(this::prepareCommit, intercept(arrayList));
                        });
                    }
                });
                unitOfWork.onCommit(unitOfWork3 -> {
                    if (!unitOfWork3.parent().isPresent() || unitOfWork3.root().phase().isAfter(UnitOfWork.Phase.COMMIT)) {
                        doWithEvents(this::commit, arrayList);
                    } else {
                        unitOfWork3.root().onCommit(unitOfWork3 -> {
                            doWithEvents(this::commit, arrayList);
                        });
                    }
                });
                unitOfWork.afterCommit(unitOfWork4 -> {
                    if (!unitOfWork4.parent().isPresent() || unitOfWork4.root().phase().isAfter(UnitOfWork.Phase.AFTER_COMMIT)) {
                        doWithEvents(this::afterCommit, arrayList);
                    } else {
                        unitOfWork4.root().afterCommit(unitOfWork4 -> {
                            doWithEvents(this::afterCommit, arrayList);
                        });
                    }
                });
                return arrayList;
            })).addAll(list);
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    protected List<? extends EventMessage<?>> intercept(List<? extends EventMessage<?>> list) {
        ArrayList arrayList = new ArrayList(list);
        Iterator<MessageDispatchInterceptor<EventMessage<?>>> it = this.dispatchInterceptors.iterator();
        while (it.hasNext()) {
            BiFunction<Integer, EventMessage<?>, EventMessage<?>> handle = it.next().handle(arrayList);
            for (int i = 0; i < arrayList.size(); i++) {
                arrayList.set(i, handle.apply(Integer.valueOf(i), arrayList.get(i)));
            }
        }
        return arrayList;
    }

    private void doWithEvents(Consumer<List<? extends EventMessage<?>>> consumer, List<? extends EventMessage<?>> list) {
        if (CurrentUnitOfWork.isStarted()) {
            CurrentUnitOfWork.get().resources().remove(this.eventsKey);
        }
        consumer.accept(list);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void prepareCommit(List<? extends EventMessage<?>> list) {
        MessageMonitor<? super EventMessage<?>> messageMonitor = this.messageMonitor;
        messageMonitor.getClass();
        list.forEach((v1) -> {
            r1.onMessageIngested(v1);
        });
        this.eventProcessors.forEach(consumer -> {
            consumer.accept(list);
        });
    }

    protected void commit(List<? extends EventMessage<?>> list) {
    }

    protected void afterCommit(List<? extends EventMessage<?>> list) {
    }
}
