package org.axonframework.eventhandling.pooled;

import java.lang.invoke.MethodHandles;
import java.time.Clock;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.UnaryOperator;
import org.axonframework.common.Assert;
import org.axonframework.common.transaction.TransactionManager;
import org.axonframework.eventhandling.EventMessage;
import org.axonframework.eventhandling.GenericEventMessage;
import org.axonframework.eventhandling.Segment;
import org.axonframework.eventhandling.TrackedEventMessage;
import org.axonframework.eventhandling.TrackerStatus;
import org.axonframework.eventhandling.TrackingToken;
import org.axonframework.eventhandling.WrappedToken;
import org.axonframework.eventhandling.tokenstore.TokenStore;
import org.axonframework.messaging.unitofwork.BatchingUnitOfWork;
import org.axonframework.messaging.unitofwork.UnitOfWork;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/axonframework/eventhandling/pooled/WorkPackage.class */
public class WorkPackage {
    private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
    static final int BUFFER_SIZE = 1024;
    private final String name;
    private final TokenStore tokenStore;
    private final TransactionManager transactionManager;
    private final ExecutorService executorService;
    private final EventFilter eventFilter;
    private final BatchProcessor batchProcessor;
    private final Segment segment;
    private final int batchSize;
    private final long claimExtensionThreshold;
    private final Consumer<UnaryOperator<TrackerStatus>> segmentStatusUpdater;
    private final Clock clock;
    private final String segmentIdResourceKey;
    private final String lastTokenResourceKey;
    private TrackingToken lastDeliveredToken;
    private TrackingToken lastConsumedToken;
    private TrackingToken lastStoredToken;
    private final AtomicLong nextClaimExtension;
    private final AtomicBoolean processingEvents;
    private final Queue<ProcessingEntry> processingQueue;
    private final AtomicBoolean scheduled;
    private final AtomicReference<CompletableFuture<Exception>> abortFlag;
    private final AtomicReference<Exception> abortException;

    /* loaded from: input_file:org/axonframework/eventhandling/pooled/WorkPackage$BatchProcessingEntry.class */
    private static class BatchProcessingEntry implements ProcessingEntry {
        private final List<ProcessingEntry> processingEntries = new ArrayList();

        public void add(ProcessingEntry processingEntry) {
            this.processingEntries.add(processingEntry);
        }

        @Override // org.axonframework.eventhandling.pooled.WorkPackage.ProcessingEntry
        public TrackingToken trackingToken() {
            return this.processingEntries.get(0).trackingToken();
        }

        @Override // org.axonframework.eventhandling.pooled.WorkPackage.ProcessingEntry
        public void addToBatch(List<TrackedEventMessage<?>> list, TrackingToken trackingToken) {
            this.processingEntries.forEach(processingEntry -> {
                processingEntry.addToBatch(list, trackingToken);
            });
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @FunctionalInterface
    /* loaded from: input_file:org/axonframework/eventhandling/pooled/WorkPackage$BatchProcessor.class */
    public interface BatchProcessor {
        void processBatch(List<? extends EventMessage<?>> list, UnitOfWork<? extends EventMessage<?>> unitOfWork, Collection<Segment> collection) throws Exception;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/axonframework/eventhandling/pooled/WorkPackage$Builder.class */
    public static class Builder {
        private String name;
        private TokenStore tokenStore;
        private TransactionManager transactionManager;
        private ExecutorService executorService;
        private EventFilter eventFilter;
        private BatchProcessor batchProcessor;
        private Segment segment;
        private TrackingToken initialToken;
        private Consumer<UnaryOperator<TrackerStatus>> segmentStatusUpdater;
        private int batchSize = 1;
        private long claimExtensionThreshold = 5000;
        private Clock clock = GenericEventMessage.clock;

        Builder() {
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public Builder name(String str) {
            this.name = str;
            return this;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public Builder tokenStore(TokenStore tokenStore) {
            this.tokenStore = tokenStore;
            return this;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public Builder transactionManager(TransactionManager transactionManager) {
            this.transactionManager = transactionManager;
            return this;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public Builder executorService(ExecutorService executorService) {
            this.executorService = executorService;
            return this;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public Builder eventFilter(EventFilter eventFilter) {
            this.eventFilter = eventFilter;
            return this;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public Builder batchProcessor(BatchProcessor batchProcessor) {
            this.batchProcessor = batchProcessor;
            return this;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public Builder segment(Segment segment) {
            this.segment = segment;
            return this;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public Builder initialToken(TrackingToken trackingToken) {
            this.initialToken = trackingToken;
            return this;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public Builder batchSize(int i) {
            this.batchSize = i;
            return this;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public Builder claimExtensionThreshold(long j) {
            this.claimExtensionThreshold = j;
            return this;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public Builder segmentStatusUpdater(Consumer<UnaryOperator<TrackerStatus>> consumer) {
            this.segmentStatusUpdater = consumer;
            return this;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public Builder clock(Clock clock) {
            this.clock = clock;
            return this;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public WorkPackage build() {
            return new WorkPackage(this);
        }
    }

    /* loaded from: input_file:org/axonframework/eventhandling/pooled/WorkPackage$DefaultProcessingEntry.class */
    private static class DefaultProcessingEntry implements ProcessingEntry {
        private final TrackedEventMessage<?> eventMessage;
        private final boolean canHandle;

        public DefaultProcessingEntry(TrackedEventMessage<?> trackedEventMessage, boolean z) {
            this.eventMessage = trackedEventMessage;
            this.canHandle = z;
        }

        @Override // org.axonframework.eventhandling.pooled.WorkPackage.ProcessingEntry
        public TrackingToken trackingToken() {
            return this.eventMessage.trackingToken();
        }

        @Override // org.axonframework.eventhandling.pooled.WorkPackage.ProcessingEntry
        public void addToBatch(List<TrackedEventMessage<?>> list, TrackingToken trackingToken) {
            if (this.canHandle) {
                list.add(this.eventMessage.withTrackingToken(trackingToken));
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @FunctionalInterface
    /* loaded from: input_file:org/axonframework/eventhandling/pooled/WorkPackage$EventFilter.class */
    public interface EventFilter {
        boolean canHandle(TrackedEventMessage<?> trackedEventMessage, Segment segment) throws Exception;
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/axonframework/eventhandling/pooled/WorkPackage$ProcessingEntry.class */
    public interface ProcessingEntry {
        TrackingToken trackingToken();

        void addToBatch(List<TrackedEventMessage<?>> list, TrackingToken trackingToken);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public static Builder builder() {
        return new Builder();
    }

    private WorkPackage(Builder builder) {
        this.processingQueue = new ConcurrentLinkedQueue();
        this.scheduled = new AtomicBoolean();
        this.abortFlag = new AtomicReference<>();
        this.abortException = new AtomicReference<>();
        this.name = builder.name;
        this.tokenStore = builder.tokenStore;
        this.transactionManager = builder.transactionManager;
        this.executorService = builder.executorService;
        this.eventFilter = builder.eventFilter;
        this.batchProcessor = builder.batchProcessor;
        this.segment = builder.segment;
        this.lastDeliveredToken = builder.initialToken;
        this.batchSize = builder.batchSize;
        this.claimExtensionThreshold = builder.claimExtensionThreshold;
        this.segmentStatusUpdater = builder.segmentStatusUpdater;
        this.clock = builder.clock;
        this.segmentIdResourceKey = "Processor[" + builder.name + "]/SegmentId";
        this.lastTokenResourceKey = "Processor[" + builder.name + "]/Token";
        this.lastConsumedToken = builder.initialToken;
        this.nextClaimExtension = new AtomicLong(now() + this.claimExtensionThreshold);
        this.processingEvents = new AtomicBoolean(false);
    }

    private long now() {
        return this.clock.instant().toEpochMilli();
    }

    public boolean scheduleEvents(List<TrackedEventMessage<?>> list) {
        if (list.isEmpty()) {
            return false;
        }
        assertEqualTokens(list);
        if (list.stream().allMatch(this::shouldNotSchedule)) {
            if (!logger.isTraceEnabled()) {
                return false;
            }
            list.forEach(trackedEventMessage -> {
                logger.trace("Ignoring event [{}] with position [{}] for work package [{}]. The last token [{}] covers event's token [{}].", new Object[]{trackedEventMessage.getIdentifier(), Long.valueOf(trackedEventMessage.trackingToken().position().orElse(-1L)), Integer.valueOf(this.segment.getSegmentId()), this.lastDeliveredToken, trackedEventMessage.trackingToken()});
            });
            return false;
        }
        BatchProcessingEntry batchProcessingEntry = new BatchProcessingEntry();
        boolean booleanValue = ((Boolean) list.stream().map(trackedEventMessage2 -> {
            boolean canHandle = canHandle(trackedEventMessage2);
            batchProcessingEntry.add(new DefaultProcessingEntry(trackedEventMessage2, canHandle));
            return Boolean.valueOf(canHandle);
        }).reduce((v0, v1) -> {
            return Boolean.logicalOr(v0, v1);
        }).orElse(false)).booleanValue();
        this.processingQueue.add(batchProcessingEntry);
        this.lastDeliveredToken = batchProcessingEntry.trackingToken();
        scheduleWorker();
        return booleanValue;
    }

    private void assertEqualTokens(List<TrackedEventMessage<?>> list) {
        TrackingToken trackingToken = list.get(0).trackingToken();
        Assert.isTrue(list.stream().map((v0) -> {
            return v0.trackingToken();
        }).allMatch(trackingToken2 -> {
            return Objects.equals(trackingToken, trackingToken2);
        }), () -> {
            return "All tokens should match when scheduling multiple events in one go.";
        });
    }

    public boolean scheduleEvent(TrackedEventMessage<?> trackedEventMessage) {
        if (shouldNotSchedule(trackedEventMessage)) {
            logger.trace("Ignoring event [{}] with position [{}] for work package [{}]. The last token [{}] covers event's token [{}].", new Object[]{trackedEventMessage.getIdentifier(), Long.valueOf(trackedEventMessage.trackingToken().position().orElse(-1L)), Integer.valueOf(this.segment.getSegmentId()), this.lastDeliveredToken, trackedEventMessage.trackingToken()});
            return false;
        }
        logger.debug("Assigned event [{}] with position [{}] to work package [{}].", new Object[]{trackedEventMessage.getIdentifier(), Long.valueOf(trackedEventMessage.trackingToken().position().orElse(-1L)), Integer.valueOf(this.segment.getSegmentId())});
        boolean canHandle = canHandle(trackedEventMessage);
        this.processingQueue.add(new DefaultProcessingEntry(trackedEventMessage, canHandle));
        this.lastDeliveredToken = trackedEventMessage.trackingToken();
        scheduleWorker();
        return canHandle;
    }

    private boolean shouldNotSchedule(TrackedEventMessage<?> trackedEventMessage) {
        return this.lastDeliveredToken != null && this.lastDeliveredToken.covers(trackedEventMessage.trackingToken());
    }

    private boolean canHandle(TrackedEventMessage<?> trackedEventMessage) {
        try {
            return this.eventFilter.canHandle(trackedEventMessage, this.segment);
        } catch (Exception e) {
            logger.warn("Error while detecting whether event can be handled in Work Package [{}]-[{}]. Aborting Work Package...", new Object[]{Integer.valueOf(this.segment.getSegmentId()), this.name, e});
            abort(e);
            return false;
        }
    }

    public void scheduleWorker() {
        if (this.scheduled.compareAndSet(false, true)) {
            logger.debug("Scheduling Work Package [{}]-[{}] to process events.", Integer.valueOf(this.segment.getSegmentId()), this.name);
            this.executorService.submit(() -> {
                CompletableFuture completableFuture = this.abortFlag.get();
                if (completableFuture != null) {
                    logger.debug("Work Package [{}]-[{}] should be aborted. Will shutdown this work package.", Integer.valueOf(this.segment.getSegmentId()), this.name);
                    this.segmentStatusUpdater.accept(trackerStatus -> {
                        return null;
                    });
                    completableFuture.complete(this.abortException.get());
                    return;
                }
                try {
                    processEvents();
                } catch (Exception e) {
                    logger.warn("Error while processing batch in Work Package [{}]-[{}]. Aborting Work Package...", new Object[]{Integer.valueOf(this.segment.getSegmentId()), this.name, e});
                    abort(e);
                }
                this.scheduled.set(false);
                if (this.processingQueue.isEmpty() && this.abortFlag.get() == null) {
                    return;
                }
                logger.debug("Rescheduling Work Package [{}]-[{}] since there are events left.", Integer.valueOf(this.segment.getSegmentId()), this.name);
                scheduleWorker();
            });
        }
    }

    private void processEvents() throws Exception {
        ArrayList arrayList = new ArrayList();
        while (!isAbortTriggered() && arrayList.size() < this.batchSize && !this.processingQueue.isEmpty()) {
            ProcessingEntry poll = this.processingQueue.poll();
            this.lastConsumedToken = WrappedToken.advance(this.lastConsumedToken, poll.trackingToken());
            poll.addToBatch(arrayList, this.lastConsumedToken);
        }
        if (arrayList.isEmpty()) {
            this.segmentStatusUpdater.accept(trackerStatus -> {
                return trackerStatus.advancedTo(this.lastConsumedToken);
            });
            if (this.lastStoredToken == this.lastConsumedToken || now() <= this.nextClaimExtension.get()) {
                extendClaimIfThresholdIsMet();
                return;
            } else {
                this.transactionManager.executeInTransaction(() -> {
                    storeToken(this.lastConsumedToken);
                });
                return;
            }
        }
        logger.debug("Work Package [{}]-[{}] is processing a batch of {} events.", new Object[]{Integer.valueOf(this.segment.getSegmentId()), this.name, Integer.valueOf(arrayList.size())});
        try {
            this.processingEvents.set(true);
            BatchingUnitOfWork batchingUnitOfWork = new BatchingUnitOfWork(arrayList);
            batchingUnitOfWork.attachTransaction(this.transactionManager);
            batchingUnitOfWork.resources().put(this.segmentIdResourceKey, Integer.valueOf(this.segment.getSegmentId()));
            batchingUnitOfWork.resources().put(this.lastTokenResourceKey, this.lastConsumedToken);
            batchingUnitOfWork.onPrepareCommit(unitOfWork -> {
                storeToken(this.lastConsumedToken);
            });
            batchingUnitOfWork.afterCommit(unitOfWork2 -> {
                this.segmentStatusUpdater.accept(trackerStatus2 -> {
                    return trackerStatus2.advancedTo(this.lastConsumedToken);
                });
            });
            this.batchProcessor.processBatch(arrayList, batchingUnitOfWork, Collections.singleton(this.segment));
        } finally {
            this.processingEvents.set(false);
        }
    }

    public void extendClaimIfThresholdIsMet() {
        if (now() > this.nextClaimExtension.get()) {
            logger.debug("Work Package [{}]-[{}] will extend its token claim.", this.name, Integer.valueOf(this.segment.getSegmentId()));
            this.transactionManager.executeInTransaction(() -> {
                this.tokenStore.extendClaim(this.name, this.segment.getSegmentId());
            });
            this.nextClaimExtension.set(now() + this.claimExtensionThreshold);
        }
    }

    private void storeToken(TrackingToken trackingToken) {
        logger.debug("Work Package [{}]-[{}] will store token [{}].", new Object[]{this.name, Integer.valueOf(this.segment.getSegmentId()), trackingToken});
        this.tokenStore.storeToken(trackingToken, this.name, this.segment.getSegmentId());
        this.lastStoredToken = trackingToken;
        this.nextClaimExtension.set(now() + this.claimExtensionThreshold);
    }

    public boolean hasRemainingCapacity() {
        return this.processingQueue.size() < 1024;
    }

    public boolean isDone() {
        return this.processingQueue.isEmpty() && !this.scheduled.get();
    }

    public Segment segment() {
        return this.segment;
    }

    public TrackingToken lastDeliveredToken() {
        return this.lastDeliveredToken;
    }

    public boolean isAbortTriggered() {
        return this.abortFlag.get() != null;
    }

    public CompletableFuture<Exception> abort(Exception exc) {
        if (exc != null) {
            logger.debug("Abort request received for Work Package [{}]-[{}].", new Object[]{this.name, Integer.valueOf(this.segment.getSegmentId()), exc});
            this.segmentStatusUpdater.accept(trackerStatus -> {
                if (trackerStatus != null) {
                    return trackerStatus.isErrorState() ? trackerStatus : trackerStatus.markError(exc);
                }
                return null;
            });
        }
        CompletableFuture<Exception> updateAndGet = this.abortFlag.updateAndGet(completableFuture -> {
            if (completableFuture == null) {
                this.abortException.set(exc);
                return new CompletableFuture();
            }
            this.abortException.updateAndGet(exc2 -> {
                return exc2 == null ? exc : exc2;
            });
            return completableFuture;
        });
        scheduleWorker();
        return updateAndGet;
    }

    public boolean isProcessingEvents() {
        return this.processingEvents.get();
    }
}
