package org.axonframework.eventhandling.pooled;

import java.lang.invoke.MethodHandles;
import java.time.Clock;
import java.util.Arrays;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.UnaryOperator;
import java.util.stream.IntStream;
import javax.annotation.Nonnull;
import org.axonframework.common.Assert;
import org.axonframework.common.AxonConfigurationException;
import org.axonframework.common.BuilderUtils;
import org.axonframework.common.transaction.TransactionManager;
import org.axonframework.configuration.LifecycleRegistry;
import org.axonframework.eventhandling.AbstractEventProcessor;
import org.axonframework.eventhandling.ErrorHandler;
import org.axonframework.eventhandling.EventHandlerInvoker;
import org.axonframework.eventhandling.EventMessage;
import org.axonframework.eventhandling.EventProcessorSpanFactory;
import org.axonframework.eventhandling.EventTrackerStatus;
import org.axonframework.eventhandling.GenericEventMessage;
import org.axonframework.eventhandling.ReplayToken;
import org.axonframework.eventhandling.Segment;
import org.axonframework.eventhandling.StreamingEventProcessor;
import org.axonframework.eventhandling.TrackedEventMessage;
import org.axonframework.eventhandling.TrackerStatus;
import org.axonframework.eventhandling.TrackingToken;
import org.axonframework.eventhandling.tokenstore.TokenStore;
import org.axonframework.lifecycle.Lifecycle;
import org.axonframework.messaging.StreamableMessageSource;
import org.axonframework.messaging.unitofwork.RollbackConfiguration;
import org.axonframework.messaging.unitofwork.RollbackConfigurationType;
import org.axonframework.monitoring.MessageMonitor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/axonframework/eventhandling/pooled/PooledStreamingEventProcessor.class */
public class PooledStreamingEventProcessor extends AbstractEventProcessor implements StreamingEventProcessor, Lifecycle {
    private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
    private final String name;
    private final StreamableMessageSource<TrackedEventMessage<?>> messageSource;
    private final TokenStore tokenStore;
    private final TransactionManager transactionManager;
    private final ScheduledExecutorService workerExecutor;
    private final Coordinator coordinator;
    private final Function<StreamableMessageSource<TrackedEventMessage<?>>, TrackingToken> initialToken;
    private final long tokenClaimInterval;
    private final MaxSegmentProvider maxSegmentProvider;
    private final long claimExtensionThreshold;
    private final int batchSize;
    private final Clock clock;
    private final AtomicReference<String> tokenStoreIdentifier;
    private final Map<Integer, TrackerStatus> processingStatus;

    /* loaded from: input_file:org/axonframework/eventhandling/pooled/PooledStreamingEventProcessor$Builder.class */
    public static class Builder extends AbstractEventProcessor.Builder {
        private StreamableMessageSource<TrackedEventMessage<?>> messageSource;
        private TokenStore tokenStore;
        private TransactionManager transactionManager;
        private Function<String, ScheduledExecutorService> coordinatorExecutorBuilder;
        private Function<String, ScheduledExecutorService> workerExecutorBuilder;
        private int initialSegmentCount = 16;
        private Function<StreamableMessageSource<TrackedEventMessage<?>>, TrackingToken> initialToken = streamableMessageSource -> {
            return ReplayToken.createReplayToken(streamableMessageSource.createHeadToken());
        };
        private long tokenClaimInterval = 5000;
        private MaxSegmentProvider maxSegmentProvider = MaxSegmentProvider.maxShort();
        private long claimExtensionThreshold = 5000;
        private int batchSize = 1;
        private Clock clock = GenericEventMessage.clock;
        private boolean coordinatorExtendsClaims = false;

        protected Builder() {
            rollbackConfiguration((RollbackConfiguration) RollbackConfigurationType.ANY_THROWABLE);
        }

        @Override // org.axonframework.eventhandling.AbstractEventProcessor.Builder
        public Builder name(@Nonnull String str) {
            super.name(str);
            return this;
        }

        @Override // org.axonframework.eventhandling.AbstractEventProcessor.Builder
        public Builder eventHandlerInvoker(@Nonnull EventHandlerInvoker eventHandlerInvoker) {
            super.eventHandlerInvoker(eventHandlerInvoker);
            return this;
        }

        @Override // org.axonframework.eventhandling.AbstractEventProcessor.Builder
        public Builder rollbackConfiguration(@Nonnull RollbackConfiguration rollbackConfiguration) {
            super.rollbackConfiguration(rollbackConfiguration);
            return this;
        }

        @Override // org.axonframework.eventhandling.AbstractEventProcessor.Builder
        public Builder errorHandler(@Nonnull ErrorHandler errorHandler) {
            super.errorHandler(errorHandler);
            return this;
        }

        @Override // org.axonframework.eventhandling.AbstractEventProcessor.Builder
        public Builder messageMonitor(@Nonnull MessageMonitor<? super EventMessage<?>> messageMonitor) {
            super.messageMonitor(messageMonitor);
            return this;
        }

        @Override // org.axonframework.eventhandling.AbstractEventProcessor.Builder
        public Builder spanFactory(@Nonnull EventProcessorSpanFactory eventProcessorSpanFactory) {
            super.spanFactory(eventProcessorSpanFactory);
            return this;
        }

        public Builder messageSource(@Nonnull StreamableMessageSource<TrackedEventMessage<?>> streamableMessageSource) {
            BuilderUtils.assertNonNull(streamableMessageSource, "StreamableMessageSource may not be null");
            this.messageSource = streamableMessageSource;
            return this;
        }

        public Builder tokenStore(@Nonnull TokenStore tokenStore) {
            BuilderUtils.assertNonNull(tokenStore, "TokenStore may not be null");
            this.tokenStore = tokenStore;
            return this;
        }

        public Builder transactionManager(@Nonnull TransactionManager transactionManager) {
            BuilderUtils.assertNonNull(transactionManager, "TransactionManager may not be null");
            this.transactionManager = transactionManager;
            return this;
        }

        public Builder coordinatorExecutor(@Nonnull ScheduledExecutorService scheduledExecutorService) {
            BuilderUtils.assertNonNull(scheduledExecutorService, "The Coordinator's ScheduledExecutorService may not be null");
            this.coordinatorExecutorBuilder = str -> {
                return scheduledExecutorService;
            };
            return this;
        }

        public Builder coordinatorExecutor(@Nonnull Function<String, ScheduledExecutorService> function) {
            BuilderUtils.assertNonNull(function, "The Coordinator's ScheduledExecutorService builder may not be null");
            this.coordinatorExecutorBuilder = function;
            return this;
        }

        public Builder workerExecutor(@Nonnull ScheduledExecutorService scheduledExecutorService) {
            BuilderUtils.assertNonNull(scheduledExecutorService, "The Worker's ScheduledExecutorService may not be null");
            this.workerExecutorBuilder = str -> {
                return scheduledExecutorService;
            };
            return this;
        }

        public Builder workerExecutor(@Nonnull Function<String, ScheduledExecutorService> function) {
            BuilderUtils.assertNonNull(function, "The Worker's ScheduledExecutorService builder may not be null");
            this.workerExecutorBuilder = function;
            return this;
        }

        public Builder initialSegmentCount(int i) {
            BuilderUtils.assertStrictPositive(i, "The initial segment count should be a higher valuer than zero");
            this.initialSegmentCount = i;
            return this;
        }

        public Builder initialToken(@Nonnull Function<StreamableMessageSource<TrackedEventMessage<?>>, TrackingToken> function) {
            BuilderUtils.assertNonNull(function, "The initial token builder Function may not be null");
            this.initialToken = function;
            return this;
        }

        public Builder tokenClaimInterval(long j) {
            BuilderUtils.assertStrictPositive(j, "Token claim interval should be a higher valuer than zero");
            this.tokenClaimInterval = j;
            return this;
        }

        public Builder maxClaimedSegments(int i) {
            this.maxSegmentProvider = str -> {
                return i;
            };
            return this;
        }

        public Builder maxSegmentProvider(MaxSegmentProvider maxSegmentProvider) {
            BuilderUtils.assertNonNull(maxSegmentProvider, "The max segment provider may not be null. Provide a lambda of type (processorName: String) -> maxSegmentsToClaim");
            BuilderUtils.assertStrictPositive(maxSegmentProvider.getMaxSegments(this.name), "Max claimed segments should be a higher valuer than zero");
            this.maxSegmentProvider = maxSegmentProvider;
            return this;
        }

        public Builder claimExtensionThreshold(long j) {
            BuilderUtils.assertStrictPositive(j, "The claim extension threshold should be a higher valuer than zero");
            this.claimExtensionThreshold = j;
            return this;
        }

        public Builder batchSize(int i) {
            BuilderUtils.assertStrictPositive(i, "The batch size should be a higher valuer than zero");
            this.batchSize = i;
            return this;
        }

        public Builder clock(@Nonnull Clock clock) {
            BuilderUtils.assertNonNull(clock, "Clock may not be null");
            this.clock = clock;
            return this;
        }

        public Builder enableCoordinatorClaimExtension() {
            this.coordinatorExtendsClaims = true;
            return this;
        }

        public PooledStreamingEventProcessor build() {
            return new PooledStreamingEventProcessor(this);
        }

        /* JADX INFO: Access modifiers changed from: protected */
        @Override // org.axonframework.eventhandling.AbstractEventProcessor.Builder
        public void validate() throws AxonConfigurationException {
            super.validate();
            BuilderUtils.assertNonNull(this.messageSource, "The StreamableMessageSource is a hard requirement and should be provided");
            BuilderUtils.assertNonNull(this.tokenStore, "The TokenStore is a hard requirement and should be provided");
            BuilderUtils.assertNonNull(this.transactionManager, "The TransactionManager is a hard requirement and should be provided");
            BuilderUtils.assertNonNull(this.coordinatorExecutorBuilder, "The Coordinator ScheduledExecutorService is a hard requirement and should be provided");
            BuilderUtils.assertNonNull(this.workerExecutorBuilder, "The Worker ScheduledExecutorService is a hard requirement and should be provided");
        }

        public String name() {
            return this.name;
        }

        @Override // org.axonframework.eventhandling.AbstractEventProcessor.Builder
        public /* bridge */ /* synthetic */ AbstractEventProcessor.Builder messageMonitor(@Nonnull MessageMonitor messageMonitor) {
            return messageMonitor((MessageMonitor<? super EventMessage<?>>) messageMonitor);
        }
    }

    protected PooledStreamingEventProcessor(Builder builder) {
        super(builder);
        this.tokenStoreIdentifier = new AtomicReference<>();
        this.processingStatus = new ConcurrentHashMap();
        this.name = builder.name();
        this.messageSource = builder.messageSource;
        this.tokenStore = builder.tokenStore;
        this.transactionManager = builder.transactionManager;
        this.workerExecutor = builder.workerExecutorBuilder.apply(this.name);
        this.initialToken = builder.initialToken;
        this.tokenClaimInterval = builder.tokenClaimInterval;
        this.maxSegmentProvider = builder.maxSegmentProvider;
        this.claimExtensionThreshold = builder.claimExtensionThreshold;
        this.batchSize = builder.batchSize;
        this.clock = builder.clock;
        this.coordinator = Coordinator.builder().name(this.name).messageSource(this.messageSource).tokenStore(this.tokenStore).transactionManager(this.transactionManager).executorService(builder.coordinatorExecutorBuilder.apply(this.name)).workPackageFactory(this::spawnWorker).eventFilter(trackedEventMessage -> {
            return canHandleType(trackedEventMessage.getPayloadType());
        }).onMessageIgnored(eventMessage -> {
            this.reportIgnored(eventMessage);
        }).processingStatusUpdater((v1, v2) -> {
            statusUpdater(v1, v2);
        }).tokenClaimInterval(this.tokenClaimInterval).claimExtensionThreshold(this.claimExtensionThreshold).clock(this.clock).maxSegmentProvider(this.maxSegmentProvider).initialSegmentCount(builder.initialSegmentCount).initialToken(this.initialToken).coordinatorClaimExtension(builder.coordinatorExtendsClaims).segmentReleasedAction(segment -> {
            eventHandlerInvoker().segmentReleased(segment);
        }).build();
    }

    public static Builder builder() {
        return new Builder();
    }

    @Override // org.axonframework.lifecycle.Lifecycle
    public void registerLifecycleHandlers(@Nonnull LifecycleRegistry lifecycleRegistry) {
        lifecycleRegistry.onStart(1073741823, this::start);
        lifecycleRegistry.onShutdown(1073741823, this::shutdownAsync);
    }

    @Override // org.axonframework.eventhandling.EventProcessor
    public void start() {
        logger.info("Starting PooledStreamingEventProcessor [{}].", this.name);
        this.coordinator.start();
    }

    @Override // org.axonframework.eventhandling.EventProcessor
    public void shutDown() {
        shutdownAsync().join();
    }

    @Override // org.axonframework.eventhandling.EventProcessor
    public CompletableFuture<Void> shutdownAsync() {
        logger.info("Stopping PooledStreamingEventProcessor [{}]", this.name);
        return this.coordinator.stop();
    }

    @Override // org.axonframework.eventhandling.EventProcessor
    public boolean isRunning() {
        return this.coordinator.isRunning();
    }

    @Override // org.axonframework.eventhandling.EventProcessor
    public boolean isError() {
        return this.coordinator.isError();
    }

    @Override // org.axonframework.eventhandling.StreamingEventProcessor
    public String getTokenStoreIdentifier() {
        return this.tokenStoreIdentifier.updateAndGet(str -> {
            return str != null ? str : calculateIdentifier();
        });
    }

    private String calculateIdentifier() {
        return (String) this.transactionManager.fetchInTransaction(() -> {
            return this.tokenStore.retrieveStorageIdentifier().orElse("--unknown--");
        });
    }

    @Override // org.axonframework.eventhandling.StreamingEventProcessor
    public void releaseSegment(int i) {
        releaseSegment(i, this.tokenClaimInterval * 2, TimeUnit.MILLISECONDS);
    }

    @Override // org.axonframework.eventhandling.StreamingEventProcessor
    public void releaseSegment(int i, long j, TimeUnit timeUnit) {
        this.coordinator.releaseUntil(i, GenericEventMessage.clock.instant().plusMillis(timeUnit.toMillis(j)));
    }

    @Override // org.axonframework.eventhandling.StreamingEventProcessor
    public CompletableFuture<Boolean> claimSegment(int i) {
        return this.coordinator.claimSegment(i);
    }

    @Override // org.axonframework.eventhandling.StreamingEventProcessor
    public CompletableFuture<Boolean> splitSegment(int i) {
        if (this.tokenStore.requiresExplicitSegmentInitialization()) {
            return this.coordinator.splitSegment(i);
        }
        CompletableFuture<Boolean> completableFuture = new CompletableFuture<>();
        completableFuture.completeExceptionally(new UnsupportedOperationException("TokenStore must require explicit initialization to safely split tokens."));
        return completableFuture;
    }

    @Override // org.axonframework.eventhandling.StreamingEventProcessor
    public CompletableFuture<Boolean> mergeSegment(int i) {
        if (this.tokenStore.requiresExplicitSegmentInitialization()) {
            return this.coordinator.mergeSegment(i);
        }
        CompletableFuture<Boolean> completableFuture = new CompletableFuture<>();
        completableFuture.completeExceptionally(new UnsupportedOperationException("TokenStore must require explicit initialization to safely merge tokens."));
        return completableFuture;
    }

    @Override // org.axonframework.eventhandling.StreamingEventProcessor
    public boolean supportsReset() {
        return eventHandlerInvoker().supportsReset();
    }

    @Override // org.axonframework.eventhandling.StreamingEventProcessor
    public void resetTokens() {
        resetTokens(this.initialToken);
    }

    @Override // org.axonframework.eventhandling.StreamingEventProcessor
    public <R> void resetTokens(R r) {
        resetTokens(this.initialToken, (Function<StreamableMessageSource<TrackedEventMessage<?>>, TrackingToken>) r);
    }

    @Override // org.axonframework.eventhandling.StreamingEventProcessor
    public void resetTokens(@Nonnull Function<StreamableMessageSource<TrackedEventMessage<?>>, TrackingToken> function) {
        resetTokens(function.apply(this.messageSource));
    }

    @Override // org.axonframework.eventhandling.StreamingEventProcessor
    public <R> void resetTokens(@Nonnull Function<StreamableMessageSource<TrackedEventMessage<?>>, TrackingToken> function, R r) {
        resetTokens(function.apply(this.messageSource), (TrackingToken) r);
    }

    @Override // org.axonframework.eventhandling.StreamingEventProcessor
    public void resetTokens(@Nonnull TrackingToken trackingToken) {
        resetTokens(trackingToken, (TrackingToken) null);
    }

    @Override // org.axonframework.eventhandling.StreamingEventProcessor
    public <R> void resetTokens(@Nonnull TrackingToken trackingToken, R r) {
        Assert.state(supportsReset(), () -> {
            return "The handlers assigned to this Processor do not support a reset.";
        });
        Assert.state(!isRunning(), () -> {
            return "The Processor must be shut down before triggering a reset.";
        });
        this.transactionManager.executeInTransaction(() -> {
            int[] fetchSegments = this.tokenStore.fetchSegments(getName());
            logger.debug("Processor [{}] will try to reset tokens for segments [{}].", this.name, fetchSegments);
            TrackingToken[] trackingTokenArr = (TrackingToken[]) Arrays.stream(fetchSegments).mapToObj(i -> {
                return this.tokenStore.fetchToken(getName(), i);
            }).toArray(i2 -> {
                return new TrackingToken[i2];
            });
            eventHandlerInvoker().performReset(r, null);
            IntStream.range(0, trackingTokenArr.length).forEach(i3 -> {
                this.tokenStore.storeToken(ReplayToken.createReplayToken(trackingTokenArr[i3], trackingToken, r), getName(), fetchSegments[i3]);
            });
            logger.info("Processor [{}] successfully reset tokens for segments [{}].", this.name, fetchSegments);
        });
    }

    @Override // org.axonframework.eventhandling.StreamingEventProcessor
    public int maxCapacity() {
        return this.maxSegmentProvider.getMaxSegments(this.name);
    }

    @Override // org.axonframework.eventhandling.StreamingEventProcessor
    public Map<Integer, EventTrackerStatus> processingStatus() {
        return Collections.unmodifiableMap(this.processingStatus);
    }

    private WorkPackage spawnWorker(Segment segment, TrackingToken trackingToken) {
        return WorkPackage.builder().name(this.name).tokenStore(this.tokenStore).transactionManager(this.transactionManager).executorService(this.workerExecutor).eventFilter((eventMessage, segment2) -> {
            return this.canHandle(eventMessage, segment2);
        }).batchProcessor((list, legacyUnitOfWork, collection) -> {
            this.processInUnitOfWork(list, legacyUnitOfWork, collection);
        }).segment(segment).initialToken(trackingToken).batchSize(this.batchSize).claimExtensionThreshold(this.claimExtensionThreshold).segmentStatusUpdater(singleStatusUpdater(segment.getSegmentId(), new TrackerStatus(segment, trackingToken))).clock(this.clock).build();
    }

    private Consumer<UnaryOperator<TrackerStatus>> singleStatusUpdater(int i, TrackerStatus trackerStatus) {
        return unaryOperator -> {
            this.processingStatus.compute(Integer.valueOf(i), (num, trackerStatus2) -> {
                return (TrackerStatus) unaryOperator.apply(trackerStatus2 == null ? trackerStatus : trackerStatus2);
            });
        };
    }

    private void statusUpdater(int i, UnaryOperator<TrackerStatus> unaryOperator) {
        this.processingStatus.computeIfPresent(Integer.valueOf(i), (num, trackerStatus) -> {
            return (TrackerStatus) unaryOperator.apply(trackerStatus);
        });
    }
}
