package org.axonframework.extensions.multitenancy.configuration;

import java.util.Map;
import java.util.Optional;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.function.Function;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import org.axonframework.common.AxonConfigurationException;
import org.axonframework.common.AxonThreadFactory;
import org.axonframework.common.transaction.TransactionManager;
import org.axonframework.config.Configuration;
import org.axonframework.config.EventProcessingConfigurer;
import org.axonframework.config.EventProcessingModule;
import org.axonframework.eventhandling.DirectEventProcessingStrategy;
import org.axonframework.eventhandling.EventHandlerInvoker;
import org.axonframework.eventhandling.EventMessage;
import org.axonframework.eventhandling.EventProcessor;
import org.axonframework.eventhandling.SubscribingEventProcessor;
import org.axonframework.eventhandling.TrackedEventMessage;
import org.axonframework.eventhandling.TrackingEventProcessor;
import org.axonframework.eventhandling.TrackingEventProcessorConfiguration;
import org.axonframework.eventhandling.pooled.PooledStreamingEventProcessor;
import org.axonframework.extensions.multitenancy.TenantWrappedTransactionManager;
import org.axonframework.extensions.multitenancy.components.TenantDescriptor;
import org.axonframework.extensions.multitenancy.components.TenantProvider;
import org.axonframework.extensions.multitenancy.components.deadletterqueue.MultiTenantDeadLetterProcessor;
import org.axonframework.extensions.multitenancy.components.deadletterqueue.MultiTenantDeadLetterQueue;
import org.axonframework.extensions.multitenancy.components.deadletterqueue.MultiTenantDeadLetterQueueFactory;
import org.axonframework.extensions.multitenancy.components.eventhandeling.MultiTenantEventProcessor;
import org.axonframework.extensions.multitenancy.components.eventstore.MultiTenantEventStore;
import org.axonframework.extensions.multitenancy.components.eventstore.MultiTenantSubscribableMessageSource;
import org.axonframework.messaging.StreamableMessageSource;
import org.axonframework.messaging.SubscribableMessageSource;
import org.axonframework.messaging.deadletter.SequencedDeadLetterProcessor;
import org.axonframework.messaging.deadletter.SequencedDeadLetterQueue;

/* loaded from: input_file:org/axonframework/extensions/multitenancy/configuration/MultiTenantEventProcessingModule.class */
public class MultiTenantEventProcessingModule extends EventProcessingModule {
    private final TenantProvider tenantProvider;
    private final MultiTenantStreamableMessageSourceProvider multiTenantStreamableMessageSourceProvider;
    private final MultiTenantEventProcessorPredicate multiTenantEventProcessorPredicate;
    protected final MultiTenantDeadLetterQueueFactory<EventMessage<?>> multiTenantDeadLetterQueueFactory;

    public MultiTenantEventProcessingModule(TenantProvider tenantProvider) {
        this.tenantProvider = tenantProvider;
        this.multiTenantDeadLetterQueueFactory = null;
        this.multiTenantStreamableMessageSourceProvider = (streamableMessageSource, str, tenantDescriptor, configuration) -> {
            return streamableMessageSource;
        };
        this.multiTenantEventProcessorPredicate = MultiTenantEventProcessorPredicate.enableMultiTenancy();
    }

    public MultiTenantEventProcessingModule(TenantProvider tenantProvider, MultiTenantDeadLetterQueueFactory<EventMessage<?>> multiTenantDeadLetterQueueFactory) {
        this.tenantProvider = tenantProvider;
        this.multiTenantDeadLetterQueueFactory = multiTenantDeadLetterQueueFactory;
        this.multiTenantStreamableMessageSourceProvider = (streamableMessageSource, str, tenantDescriptor, configuration) -> {
            return streamableMessageSource;
        };
        this.multiTenantEventProcessorPredicate = MultiTenantEventProcessorPredicate.enableMultiTenancy();
    }

    public MultiTenantEventProcessingModule(TenantProvider tenantProvider, MultiTenantStreamableMessageSourceProvider multiTenantStreamableMessageSourceProvider, MultiTenantDeadLetterQueueFactory<EventMessage<?>> multiTenantDeadLetterQueueFactory, MultiTenantEventProcessorPredicate multiTenantEventProcessorPredicate) {
        this.tenantProvider = tenantProvider;
        this.multiTenantEventProcessorPredicate = multiTenantEventProcessorPredicate;
        this.multiTenantDeadLetterQueueFactory = multiTenantDeadLetterQueueFactory;
        this.multiTenantStreamableMessageSourceProvider = multiTenantStreamableMessageSourceProvider;
    }

    private static String getName(String str, TenantDescriptor tenantDescriptor) {
        return str + "@" + tenantDescriptor.tenantId();
    }

    public Optional<EventProcessor> eventProcessor(String str, TenantDescriptor tenantDescriptor) {
        return Optional.ofNullable(eventProcessors().get(getName(str, tenantDescriptor)));
    }

    public Map<String, EventProcessor> eventProcessors() {
        Map<? extends String, ? extends EventProcessor> eventProcessors = super.eventProcessors();
        Map<String, EventProcessor> map = (Map) eventProcessors.entrySet().stream().filter(entry -> {
            return ((EventProcessor) entry.getValue()).getClass().isAssignableFrom(MultiTenantEventProcessor.class);
        }).flatMap(entry2 -> {
            return ((MultiTenantEventProcessor) entry2.getValue()).tenantEventProcessors().stream();
        }).collect(Collectors.toMap((v0) -> {
            return v0.getName();
        }, eventProcessor -> {
            return eventProcessor;
        }));
        map.putAll(eventProcessors);
        return map;
    }

    public EventProcessor subscribingEventProcessor(String str, EventHandlerInvoker eventHandlerInvoker, SubscribableMessageSource<? extends EventMessage<?>> subscribableMessageSource) {
        if (!this.multiTenantEventProcessorPredicate.test(str)) {
            return buildSep(str, eventHandlerInvoker, subscribableMessageSource);
        }
        MultiTenantEventProcessor build = MultiTenantEventProcessor.builder().name(str).tenantSegmentFactory(tenantDescriptor -> {
            return buildSep(tenantDescriptor, str, eventHandlerInvoker, tenantSource(subscribableMessageSource, tenantDescriptor));
        }).build();
        this.tenantProvider.subscribe(build);
        return build;
    }

    private static SubscribableMessageSource<? extends EventMessage<?>> tenantSource(SubscribableMessageSource<? extends EventMessage<?>> subscribableMessageSource, TenantDescriptor tenantDescriptor) {
        return subscribableMessageSource instanceof MultiTenantSubscribableMessageSource ? (SubscribableMessageSource) ((MultiTenantSubscribableMessageSource) subscribableMessageSource).tenantSegments().get(tenantDescriptor) : subscribableMessageSource;
    }

    private SubscribingEventProcessor buildSep(String str, EventHandlerInvoker eventHandlerInvoker, SubscribableMessageSource<? extends EventMessage<?>> subscribableMessageSource, TransactionManager transactionManager) {
        return SubscribingEventProcessor.builder().name(str).eventHandlerInvoker(eventHandlerInvoker).rollbackConfiguration(super.rollbackConfiguration(str)).errorHandler(super.errorHandler(str)).messageMonitor(super.messageMonitor(SubscribingEventProcessor.class, str)).messageSource(subscribableMessageSource).processingStrategy(DirectEventProcessingStrategy.INSTANCE).transactionManager(transactionManager).build();
    }

    private SubscribingEventProcessor buildSep(TenantDescriptor tenantDescriptor, String str, EventHandlerInvoker eventHandlerInvoker, SubscribableMessageSource<? extends EventMessage<?>> subscribableMessageSource) {
        return buildSep(getName(str, tenantDescriptor), eventHandlerInvoker, subscribableMessageSource, new TenantWrappedTransactionManager(super.transactionManager(str), tenantDescriptor));
    }

    private SubscribingEventProcessor buildSep(String str, EventHandlerInvoker eventHandlerInvoker, SubscribableMessageSource<? extends EventMessage<?>> subscribableMessageSource) {
        return buildSep(str, eventHandlerInvoker, subscribableMessageSource, super.transactionManager(str));
    }

    public EventProcessor trackingEventProcessor(String str, EventHandlerInvoker eventHandlerInvoker, TrackingEventProcessorConfiguration trackingEventProcessorConfiguration, StreamableMessageSource<TrackedEventMessage<?>> streamableMessageSource) {
        if (!this.multiTenantEventProcessorPredicate.test(str)) {
            return buildTep(str, eventHandlerInvoker, streamableMessageSource, trackingEventProcessorConfiguration);
        }
        MultiTenantEventProcessor build = MultiTenantEventProcessor.builder().name(str).tenantSegmentFactory(tenantDescriptor -> {
            return buildTep(tenantDescriptor, str, eventHandlerInvoker, this.multiTenantStreamableMessageSourceProvider.build(defaultSource(streamableMessageSource, tenantDescriptor), str, tenantDescriptor, this.configuration), trackingEventProcessorConfiguration);
        }).build();
        this.tenantProvider.subscribe(build);
        return build;
    }

    private TrackingEventProcessor buildTep(String str, EventHandlerInvoker eventHandlerInvoker, StreamableMessageSource<TrackedEventMessage<?>> streamableMessageSource, TransactionManager transactionManager, TrackingEventProcessorConfiguration trackingEventProcessorConfiguration) {
        return TrackingEventProcessor.builder().name(str).eventHandlerInvoker(eventHandlerInvoker).rollbackConfiguration(super.rollbackConfiguration(str)).errorHandler(super.errorHandler(str)).messageMonitor(super.messageMonitor(TrackingEventProcessor.class, str)).messageSource(streamableMessageSource).tokenStore(super.tokenStore(str)).transactionManager(transactionManager).trackingEventProcessorConfiguration(trackingEventProcessorConfiguration).build();
    }

    private TrackingEventProcessor buildTep(TenantDescriptor tenantDescriptor, String str, EventHandlerInvoker eventHandlerInvoker, StreamableMessageSource<TrackedEventMessage<?>> streamableMessageSource, TrackingEventProcessorConfiguration trackingEventProcessorConfiguration) {
        return buildTep(getName(str, tenantDescriptor), eventHandlerInvoker, streamableMessageSource, new TenantWrappedTransactionManager(super.transactionManager(str), tenantDescriptor), trackingEventProcessorConfiguration);
    }

    private TrackingEventProcessor buildTep(String str, EventHandlerInvoker eventHandlerInvoker, StreamableMessageSource<TrackedEventMessage<?>> streamableMessageSource, TrackingEventProcessorConfiguration trackingEventProcessorConfiguration) {
        return buildTep(str, eventHandlerInvoker, streamableMessageSource, super.transactionManager(str), trackingEventProcessorConfiguration);
    }

    public EventProcessor pooledStreamingEventProcessor(String str, EventHandlerInvoker eventHandlerInvoker, Configuration configuration, StreamableMessageSource<TrackedEventMessage<?>> streamableMessageSource, EventProcessingConfigurer.PooledStreamingProcessorConfiguration pooledStreamingProcessorConfiguration) {
        if (!this.multiTenantEventProcessorPredicate.test(str)) {
            return psepBuilder(str, eventHandlerInvoker, streamableMessageSource, configuration).build();
        }
        MultiTenantEventProcessor build = MultiTenantEventProcessor.builder().name(str).tenantSegmentFactory(tenantDescriptor -> {
            return ((PooledStreamingEventProcessor.Builder) ((EventProcessingConfigurer.PooledStreamingProcessorConfiguration) this.psepConfigs.getOrDefault("___DEFAULT_PSEP_CONFIG", EventProcessingConfigurer.PooledStreamingProcessorConfiguration.noOp())).andThen((EventProcessingConfigurer.PooledStreamingProcessorConfiguration) this.psepConfigs.getOrDefault(str, EventProcessingConfigurer.PooledStreamingProcessorConfiguration.noOp())).andThen(pooledStreamingProcessorConfiguration).apply(configuration, psepBuilder(tenantDescriptor, str, eventHandlerInvoker, this.multiTenantStreamableMessageSourceProvider.build(defaultSource(streamableMessageSource, tenantDescriptor), str, tenantDescriptor, this.configuration), configuration))).build();
        }).build();
        this.tenantProvider.subscribe(build);
        return build;
    }

    private static StreamableMessageSource<TrackedEventMessage<?>> defaultSource(StreamableMessageSource<TrackedEventMessage<?>> streamableMessageSource, TenantDescriptor tenantDescriptor) {
        return streamableMessageSource instanceof MultiTenantEventStore ? ((MultiTenantEventStore) streamableMessageSource).tenantSegments().get(tenantDescriptor) : streamableMessageSource;
    }

    private PooledStreamingEventProcessor.Builder psepBuilder(String str, EventHandlerInvoker eventHandlerInvoker, StreamableMessageSource<TrackedEventMessage<?>> streamableMessageSource, TransactionManager transactionManager, Configuration configuration) {
        return PooledStreamingEventProcessor.builder().name(str).eventHandlerInvoker(eventHandlerInvoker).rollbackConfiguration(super.rollbackConfiguration(str)).errorHandler(super.errorHandler(str)).messageMonitor(super.messageMonitor(PooledStreamingEventProcessor.class, str)).messageSource(streamableMessageSource).tokenStore(super.tokenStore(str)).transactionManager(transactionManager).coordinatorExecutor(str2 -> {
            ScheduledExecutorService defaultExecutor = defaultExecutor("Coordinator[" + str2 + "]");
            defaultExecutor.getClass();
            configuration.onShutdown(defaultExecutor::shutdown);
            return defaultExecutor;
        }).workerExecutor(str3 -> {
            ScheduledExecutorService defaultExecutor = defaultExecutor("WorkPackage[" + str3 + "]");
            defaultExecutor.getClass();
            configuration.onShutdown(defaultExecutor::shutdown);
            return defaultExecutor;
        });
    }

    private PooledStreamingEventProcessor.Builder psepBuilder(TenantDescriptor tenantDescriptor, String str, EventHandlerInvoker eventHandlerInvoker, StreamableMessageSource<TrackedEventMessage<?>> streamableMessageSource, Configuration configuration) {
        return psepBuilder(getName(str, tenantDescriptor), eventHandlerInvoker, streamableMessageSource, new TenantWrappedTransactionManager(super.transactionManager(str), tenantDescriptor), configuration);
    }

    private PooledStreamingEventProcessor.Builder psepBuilder(String str, EventHandlerInvoker eventHandlerInvoker, StreamableMessageSource<TrackedEventMessage<?>> streamableMessageSource, Configuration configuration) {
        return psepBuilder(str, eventHandlerInvoker, streamableMessageSource, super.transactionManager(str), configuration);
    }

    public EventProcessingConfigurer registerDeadLetterQueue(@Nonnull String str, @Nonnull Function<Configuration, SequencedDeadLetterQueue<EventMessage<?>>> function) {
        if (this.multiTenantDeadLetterQueueFactory == null) {
            throw new AxonConfigurationException("Cannot register a DeadLetterQueue without a MultiTenantDeadLetterQueueFactory");
        }
        MultiTenantDeadLetterQueue<EventMessage<?>> deadLetterQueue = this.multiTenantDeadLetterQueueFactory.getDeadLetterQueue(str);
        deadLetterQueue.registerDeadLetterQueueSupplier(() -> {
            return (SequencedDeadLetterQueue) function.apply(this.configuration);
        });
        return super.registerDeadLetterQueue(str, configuration -> {
            return deadLetterQueue;
        });
    }

    public Optional<SequencedDeadLetterProcessor<EventMessage<?>>> sequencedDeadLetterProcessor(@Nonnull String str) {
        return super.sequencedDeadLetterProcessor(str).map(MultiTenantDeadLetterProcessor::new);
    }

    private ScheduledExecutorService defaultExecutor(String str) {
        return Executors.newScheduledThreadPool(1, new AxonThreadFactory(str));
    }
}
