package org.axonframework.eventhandling;

import java.util.ArrayList;
import java.util.Objects;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.axonframework.common.Assert;
import org.axonframework.common.AxonThreadFactory;
import org.axonframework.eventhandling.tokenstore.TokenStore;
import org.axonframework.eventsourcing.eventstore.TrackingEventStream;
import org.axonframework.eventsourcing.eventstore.TrackingToken;
import org.axonframework.messaging.unitofwork.RollbackConfiguration;
import org.axonframework.messaging.unitofwork.RollbackConfigurationType;
import org.axonframework.monitoring.MessageMonitor;
import org.axonframework.monitoring.NoOpMessageMonitor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/axonframework/eventhandling/TrackingEventProcessor.class */
public class TrackingEventProcessor extends AbstractEventProcessor {
    private static final Logger logger = LoggerFactory.getLogger(TrackingEventProcessor.class);
    private static final ThreadGroup threadGroup = new ThreadGroup(TrackingEventProcessor.class.getSimpleName());
    private final EventBus eventBus;
    private final TokenStore tokenStore;
    private final int batchSize;
    private final ExecutorService executorService;
    private TrackingEventStream eventStream;
    private volatile TrackingToken lastToken;

    public TrackingEventProcessor(String str, EventHandlerInvoker eventHandlerInvoker, EventBus eventBus, TokenStore tokenStore) {
        this(str, eventHandlerInvoker, eventBus, tokenStore, NoOpMessageMonitor.INSTANCE);
    }

    public TrackingEventProcessor(String str, EventHandlerInvoker eventHandlerInvoker, EventBus eventBus, TokenStore tokenStore, MessageMonitor<? super EventMessage<?>> messageMonitor) {
        this(str, eventHandlerInvoker, RollbackConfigurationType.ANY_THROWABLE, NoOpErrorHandler.INSTANCE, eventBus, tokenStore, 1, messageMonitor);
    }

    public TrackingEventProcessor(String str, EventHandlerInvoker eventHandlerInvoker, RollbackConfiguration rollbackConfiguration, ErrorHandler errorHandler, EventBus eventBus, TokenStore tokenStore, int i, MessageMonitor<? super EventMessage<?>> messageMonitor) {
        super(str, eventHandlerInvoker, rollbackConfiguration, errorHandler, messageMonitor);
        this.executorService = Executors.newSingleThreadExecutor(new AxonThreadFactory(threadGroup));
        this.eventBus = (EventBus) Objects.requireNonNull(eventBus);
        this.tokenStore = (TokenStore) Objects.requireNonNull(tokenStore);
        Assert.isTrue(i > 0, "batchSize needs to be greater than 0");
        this.batchSize = i;
    }

    public void start() {
        registerInterceptor((unitOfWork, interceptorChain) -> {
            unitOfWork.onPrepareCommit(unitOfWork -> {
                EventMessage eventMessage = (EventMessage) unitOfWork.getMessage();
                if ((eventMessage instanceof TrackedEventMessage) && ((TrackedEventMessage) eventMessage).trackingToken().equals(this.lastToken)) {
                    this.tokenStore.storeToken(getName(), 0, this.lastToken);
                }
            });
            return interceptorChain.proceed();
        });
        this.executorService.submit(() -> {
            try {
                doProcess();
            } catch (Throwable th) {
                th.printStackTrace(System.err);
            }
        });
    }

    public void shutDown() {
        this.executorService.shutdown();
    }

    protected void doProcess() {
        while (!this.executorService.isShutdown()) {
            if (this.eventStream == null) {
                this.eventStream = this.eventBus.streamEvents(this.tokenStore.fetchToken(getName(), 0));
            }
            ArrayList arrayList = new ArrayList();
            try {
                if (arrayList.isEmpty() && this.eventStream.hasNextAvailable(1, TimeUnit.SECONDS)) {
                    arrayList.add(this.eventStream.nextAvailable());
                }
                while (arrayList.size() < this.batchSize && this.eventStream.hasNextAvailable()) {
                    arrayList.add(this.eventStream.nextAvailable());
                }
                if (!arrayList.isEmpty()) {
                    this.lastToken = ((TrackedEventMessage) arrayList.get(arrayList.size() - 1)).trackingToken();
                    doProcessBatch(arrayList);
                }
            } catch (InterruptedException e) {
                logger.error(String.format("Event processor [%s] was interrupted. Shutting down.", getName()), e);
                Thread.currentThread().interrupt();
                return;
            }
        }
    }
}
