package org.axonframework.eventhandling.replay;

import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import org.axonframework.common.DirectExecutor;
import org.axonframework.domain.DomainEventMessage;
import org.axonframework.domain.EventMessage;
import org.axonframework.eventhandling.Cluster;
import org.axonframework.eventhandling.ClusterMetaData;
import org.axonframework.eventhandling.EventListener;
import org.axonframework.eventhandling.EventProcessingMonitor;
import org.axonframework.eventstore.EventVisitor;
import org.axonframework.eventstore.management.Criteria;
import org.axonframework.eventstore.management.CriteriaBuilder;
import org.axonframework.eventstore.management.EventStoreManagement;
import org.axonframework.unitofwork.TransactionManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/axonframework/eventhandling/replay/ReplayingCluster.class */
public class ReplayingCluster implements Cluster {
    private static final Logger logger = LoggerFactory.getLogger(ReplayingCluster.class);
    private final Cluster delegate;
    private final EventStoreManagement replayingEventStore;
    private final TransactionManager transactionManager;
    private final int commitThreshold;
    private final IncomingMessageHandler incomingMessageHandler;
    private final Set<ReplayAware> replayAwareListeners = new CopyOnWriteArraySet();
    private volatile Status status = Status.LIVE;
    private final EventProcessingListeners eventHandlingListeners = new EventProcessingListeners();

    /* loaded from: input_file:org/axonframework/eventhandling/replay/ReplayingCluster$EventProcessingListeners.class */
    private final class EventProcessingListeners implements EventProcessingMonitor {
        private final Set<EventProcessingMonitor> delegates;

        private EventProcessingListeners() {
            this.delegates = new CopyOnWriteArraySet();
        }

        @Override // org.axonframework.eventhandling.EventProcessingMonitor
        public void onEventProcessingCompleted(List<? extends EventMessage> list) {
            if (ReplayingCluster.this.status != Status.REPLAYING) {
                Iterator<EventProcessingMonitor> it = this.delegates.iterator();
                while (it.hasNext()) {
                    it.next().onEventProcessingCompleted(list);
                }
            }
        }

        @Override // org.axonframework.eventhandling.EventProcessingMonitor
        public void onEventProcessingFailed(List<? extends EventMessage> list, Throwable th) {
            if (ReplayingCluster.this.status != Status.REPLAYING) {
                Iterator<EventProcessingMonitor> it = this.delegates.iterator();
                while (it.hasNext()) {
                    it.next().onEventProcessingFailed(list, th);
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/axonframework/eventhandling/replay/ReplayingCluster$ReplayEventsTask.class */
    public class ReplayEventsTask implements Runnable {
        private final Criteria criteria;

        /* loaded from: input_file:org/axonframework/eventhandling/replay/ReplayingCluster$ReplayEventsTask$ReplayingEventVisitor.class */
        private class ReplayingEventVisitor implements EventVisitor {
            private int eventCounter = 0;
            private Object currentTransaction;

            public ReplayingEventVisitor(Object obj) {
                this.currentTransaction = obj;
            }

            @Override // org.axonframework.eventstore.EventVisitor
            public void doWithEvent(DomainEventMessage domainEventMessage) {
                if (ReplayingCluster.this.commitThreshold > 0) {
                    int i = this.eventCounter + 1;
                    this.eventCounter = i;
                    if (i > ReplayingCluster.this.commitThreshold) {
                        this.eventCounter = 0;
                        ReplayingCluster.logger.trace("Replay batch size reached; committing Replay Transaction");
                        ReplayingCluster.this.transactionManager.commitTransaction(this.currentTransaction);
                        ReplayingCluster.logger.trace("Starting new Replay Transaction for next batch");
                        this.currentTransaction = ReplayingCluster.this.transactionManager.startTransaction();
                    }
                }
                ReplayingCluster.this.delegate.publish(domainEventMessage);
                List<EventMessage> releaseMessage = ReplayingCluster.this.incomingMessageHandler.releaseMessage(ReplayingCluster.this.delegate, domainEventMessage);
                if (releaseMessage == null || releaseMessage.isEmpty()) {
                    return;
                }
                ReplayingCluster.this.eventHandlingListeners.onEventProcessingCompleted(releaseMessage);
            }

            public Object getTransaction() {
                return this.currentTransaction;
            }
        }

        public ReplayEventsTask(Criteria criteria) {
            this.criteria = criteria;
        }

        /* JADX WARN: Finally extract failed */
        @Override // java.lang.Runnable
        public void run() {
            ReplayingCluster.this.incomingMessageHandler.prepareForReplay(ReplayingCluster.this.delegate);
            ReplayingCluster.this.status = Status.REPLAYING;
            ReplayingCluster.logger.trace("Cluster set to replay mode");
            Object startTransaction = ReplayingCluster.this.transactionManager.startTransaction();
            ReplayingCluster.logger.trace("Started new transaction for event replay");
            ReplayingEventVisitor replayingEventVisitor = new ReplayingEventVisitor(startTransaction);
            try {
                try {
                    ReplayingCluster.logger.trace("Notifying replay aware listeners 'beforeReplay'");
                    Iterator it = ReplayingCluster.this.replayAwareListeners.iterator();
                    while (it.hasNext()) {
                        ((ReplayAware) it.next()).beforeReplay();
                    }
                    if (this.criteria != null) {
                        ReplayingCluster.logger.trace("Starting visiting events using criteria");
                        ReplayingCluster.this.replayingEventStore.visitEvents(this.criteria, replayingEventVisitor);
                    } else {
                        ReplayingCluster.logger.trace("Starting visiting events without criteria");
                        ReplayingCluster.this.replayingEventStore.visitEvents(replayingEventVisitor);
                    }
                    ReplayingCluster.logger.trace("Notifying replay aware listeners 'afterReplay'");
                    Iterator it2 = ReplayingCluster.this.replayAwareListeners.iterator();
                    while (it2.hasNext()) {
                        ((ReplayAware) it2.next()).afterReplay();
                    }
                    ReplayingCluster.this.status = Status.PROCESSING_BACKLOG;
                    ReplayingCluster.logger.trace("Processing backlog of messages");
                    ReplayingCluster.this.incomingMessageHandler.processBacklog(ReplayingCluster.this.delegate);
                    ReplayingCluster.logger.trace("Committing transaction");
                    ReplayingCluster.this.transactionManager.commitTransaction(replayingEventVisitor.getTransaction());
                    ReplayingCluster.logger.info("Replay ended. Switching back to live mode");
                    ReplayingCluster.this.status = Status.LIVE;
                } catch (Throwable th) {
                    try {
                        ReplayingCluster.logger.error("Replay failed due to an exception.", th);
                        ReplayingCluster.this.incomingMessageHandler.onReplayFailed(ReplayingCluster.this.delegate, th);
                        ReplayingCluster.logger.trace("Notifying replay aware listeners 'replayFailed'");
                        Iterator it3 = ReplayingCluster.this.replayAwareListeners.iterator();
                        while (it3.hasNext()) {
                            ((ReplayAware) it3.next()).onReplayFailed(th);
                        }
                        ReplayingCluster.logger.trace("Rolling back replay transaction");
                        ReplayingCluster.this.transactionManager.rollbackTransaction(replayingEventVisitor.getTransaction());
                        throw new ReplayFailedException("Replay failed due to an exception.", th);
                    } catch (Throwable th2) {
                        ReplayingCluster.logger.trace("Rolling back replay transaction");
                        ReplayingCluster.this.transactionManager.rollbackTransaction(replayingEventVisitor.getTransaction());
                        throw th2;
                    }
                }
            } catch (Throwable th3) {
                ReplayingCluster.logger.info("Replay ended. Switching back to live mode");
                ReplayingCluster.this.status = Status.LIVE;
                throw th3;
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/axonframework/eventhandling/replay/ReplayingCluster$Status.class */
    public enum Status {
        LIVE,
        REPLAYING,
        PROCESSING_BACKLOG
    }

    public ReplayingCluster(Cluster cluster, EventStoreManagement eventStoreManagement, TransactionManager transactionManager, int i, IncomingMessageHandler incomingMessageHandler) {
        this.delegate = cluster;
        this.replayingEventStore = eventStoreManagement;
        this.transactionManager = transactionManager;
        this.commitThreshold = i;
        this.incomingMessageHandler = incomingMessageHandler;
        this.delegate.subscribeEventProcessingMonitor(this.eventHandlingListeners);
    }

    public CriteriaBuilder newCriteriaBuilder() {
        return this.replayingEventStore.newCriteriaBuilder();
    }

    public void startReplay() {
        startReplay((Criteria) null);
    }

    public void startReplay(Criteria criteria) {
        try {
            startReplay(DirectExecutor.INSTANCE, criteria).get();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new ReplayFailedException("Replay failed because it was interrupted", e);
        } catch (ExecutionException e2) {
            if (!(e2.getCause() instanceof ReplayFailedException)) {
                throw new ReplayFailedException("Replay failed due to an exception.", e2.getCause());
            }
            throw ((ReplayFailedException) e2.getCause());
        }
    }

    public Future<Void> startReplay(Executor executor) {
        return startReplay(executor, null);
    }

    public Future<Void> startReplay(Executor executor, Criteria criteria) {
        FutureTask futureTask = new FutureTask(new ReplayEventsTask(criteria), null);
        executor.execute(futureTask);
        return futureTask;
    }

    public boolean isInReplayMode() {
        return this.status != Status.LIVE;
    }

    @Override // org.axonframework.eventhandling.Cluster
    public String getName() {
        return this.delegate.getName();
    }

    @Override // org.axonframework.eventhandling.Cluster
    public void publish(EventMessage... eventMessageArr) {
        if (this.status == Status.LIVE) {
            this.delegate.publish(eventMessageArr);
            return;
        }
        logger.debug("Cluster is in replaying: sending message to process backlog");
        List<EventMessage> onIncomingMessages = this.incomingMessageHandler.onIncomingMessages(this.delegate, eventMessageArr);
        if (onIncomingMessages == null || onIncomingMessages.isEmpty()) {
            return;
        }
        this.eventHandlingListeners.onEventProcessingCompleted(onIncomingMessages);
    }

    @Override // org.axonframework.eventhandling.Cluster
    public void subscribe(EventListener eventListener) {
        this.delegate.subscribe(eventListener);
        if (eventListener instanceof ReplayAware) {
            this.replayAwareListeners.add((ReplayAware) eventListener);
        }
    }

    @Override // org.axonframework.eventhandling.Cluster
    public void unsubscribe(EventListener eventListener) {
        if (eventListener instanceof ReplayAware) {
            this.replayAwareListeners.remove(eventListener);
        }
        this.delegate.unsubscribe(eventListener);
    }

    @Override // org.axonframework.eventhandling.Cluster
    public Set<EventListener> getMembers() {
        return this.delegate.getMembers();
    }

    @Override // org.axonframework.eventhandling.Cluster
    public ClusterMetaData getMetaData() {
        return this.delegate.getMetaData();
    }

    @Override // org.axonframework.eventhandling.EventProcessingMonitorSupport
    public void subscribeEventProcessingMonitor(EventProcessingMonitor eventProcessingMonitor) {
        this.eventHandlingListeners.delegates.add(eventProcessingMonitor);
    }

    @Override // org.axonframework.eventhandling.EventProcessingMonitorSupport
    public void unsubscribeEventProcessingMonitor(EventProcessingMonitor eventProcessingMonitor) {
        this.eventHandlingListeners.delegates.remove(eventProcessingMonitor);
    }
}
