package org.axonframework.extensions.kafka.autoconfig;

import java.lang.invoke.MethodHandles;
import java.util.Collections;
import org.axonframework.common.AxonConfigurationException;
import org.axonframework.config.EventProcessingConfigurer;
import org.axonframework.eventhandling.PropagatingErrorHandler;
import org.axonframework.extensions.kafka.KafkaProperties;
import org.axonframework.extensions.kafka.eventhandling.DefaultKafkaMessageConverter;
import org.axonframework.extensions.kafka.eventhandling.KafkaMessageConverter;
import org.axonframework.extensions.kafka.eventhandling.consumer.AsyncFetcher;
import org.axonframework.extensions.kafka.eventhandling.consumer.ConsumerFactory;
import org.axonframework.extensions.kafka.eventhandling.consumer.DefaultConsumerFactory;
import org.axonframework.extensions.kafka.eventhandling.consumer.Fetcher;
import org.axonframework.extensions.kafka.eventhandling.consumer.streamable.KafkaEventMessage;
import org.axonframework.extensions.kafka.eventhandling.consumer.streamable.SortedKafkaMessageBuffer;
import org.axonframework.extensions.kafka.eventhandling.consumer.streamable.StreamableKafkaMessageSource;
import org.axonframework.extensions.kafka.eventhandling.producer.ConfirmationMode;
import org.axonframework.extensions.kafka.eventhandling.producer.DefaultProducerFactory;
import org.axonframework.extensions.kafka.eventhandling.producer.KafkaEventPublisher;
import org.axonframework.extensions.kafka.eventhandling.producer.KafkaPublisher;
import org.axonframework.extensions.kafka.eventhandling.producer.ProducerFactory;
import org.axonframework.serialization.Serializer;
import org.axonframework.spring.config.AxonConfiguration;
import org.axonframework.springboot.autoconfig.AxonAutoConfiguration;
import org.axonframework.springboot.autoconfig.InfraConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
import org.springframework.boot.autoconfigure.AutoConfigureBefore;
import org.springframework.boot.autoconfigure.condition.AnyNestedCondition;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Conditional;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.ConfigurationCondition;

@AutoConfigureBefore({InfraConfiguration.class})
@EnableConfigurationProperties({KafkaProperties.class})
@Configuration
@ConditionalOnClass({KafkaPublisher.class})
@AutoConfigureAfter({AxonAutoConfiguration.class})
/* loaded from: input_file:org/axonframework/extensions/kafka/autoconfig/KafkaAutoConfiguration.class */
public class KafkaAutoConfiguration {
    private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
    private final KafkaProperties properties;

    /* loaded from: input_file:org/axonframework/extensions/kafka/autoconfig/KafkaAutoConfiguration$StreamingProcessorModeCondition.class */
    private static class StreamingProcessorModeCondition extends AnyNestedCondition {

        @ConditionalOnProperty(name = {"axon.kafka.consumer.event-processor-mode"}, havingValue = "pooled_streaming")
        /* loaded from: input_file:org/axonframework/extensions/kafka/autoconfig/KafkaAutoConfiguration$StreamingProcessorModeCondition$PooledStreamingCondition.class */
        static class PooledStreamingCondition {
            PooledStreamingCondition() {
            }
        }

        @ConditionalOnProperty(name = {"axon.kafka.consumer.event-processor-mode"}, havingValue = "tracking")
        /* loaded from: input_file:org/axonframework/extensions/kafka/autoconfig/KafkaAutoConfiguration$StreamingProcessorModeCondition$TrackingCondition.class */
        static class TrackingCondition {
            TrackingCondition() {
            }
        }

        public StreamingProcessorModeCondition() {
            super(ConfigurationCondition.ConfigurationPhase.REGISTER_BEAN);
        }
    }

    public KafkaAutoConfiguration(KafkaProperties kafkaProperties) {
        this.properties = kafkaProperties;
    }

    @ConditionalOnMissingBean
    @Bean
    public KafkaMessageConverter<String, byte[]> kafkaMessageConverter(@Qualifier("eventSerializer") Serializer serializer) {
        return DefaultKafkaMessageConverter.builder().serializer(serializer).build();
    }

    @ConditionalOnMissingBean
    @Bean({"axonKafkaProducerFactory"})
    public ProducerFactory<String, byte[]> kafkaProducerFactory() {
        ConfirmationMode confirmationMode = this.properties.getPublisher().getConfirmationMode();
        String transactionIdPrefix = this.properties.getProducer().getTransactionIdPrefix();
        DefaultProducerFactory.Builder confirmationMode2 = DefaultProducerFactory.builder().configuration(this.properties.buildProducerProperties()).confirmationMode(confirmationMode);
        if (isNonEmptyString(transactionIdPrefix)) {
            confirmationMode2.transactionalIdPrefix(transactionIdPrefix).confirmationMode(ConfirmationMode.TRANSACTIONAL);
            if (!confirmationMode.isTransactional()) {
                logger.warn("The confirmation mode is set to [{}], whilst a transactional id prefix is present. The transactional id prefix overwrites the confirmation mode choice to TRANSACTIONAL", confirmationMode);
            }
        }
        return confirmationMode2.build();
    }

    private boolean isNonEmptyString(String str) {
        return (str == null || str.equals("")) ? false : true;
    }

    @ConditionalOnMissingBean
    @ConditionalOnBean({ProducerFactory.class, KafkaMessageConverter.class})
    @Bean(destroyMethod = "shutDown")
    public KafkaPublisher<String, byte[]> kafkaPublisher(ProducerFactory<String, byte[]> producerFactory, KafkaMessageConverter<String, byte[]> kafkaMessageConverter, AxonConfiguration axonConfiguration) {
        return KafkaPublisher.builder().producerFactory(producerFactory).messageConverter(kafkaMessageConverter).messageMonitor(axonConfiguration.messageMonitor(KafkaPublisher.class, "kafkaPublisher")).topic(this.properties.getDefaultTopic()).build();
    }

    @ConditionalOnMissingBean
    @ConditionalOnBean({KafkaPublisher.class})
    @Bean
    public KafkaEventPublisher<String, byte[]> kafkaEventPublisher(KafkaPublisher<String, byte[]> kafkaPublisher, KafkaProperties kafkaProperties, EventProcessingConfigurer eventProcessingConfigurer) {
        KafkaEventPublisher<String, byte[]> build = KafkaEventPublisher.builder().kafkaPublisher(kafkaPublisher).build();
        eventProcessingConfigurer.registerEventHandler(configuration -> {
            return build;
        }).registerListenerInvocationErrorHandler("__axon-kafka-event-publishing-group", configuration2 -> {
            return PropagatingErrorHandler.instance();
        }).assignHandlerTypesMatching("__axon-kafka-event-publishing-group", cls -> {
            return cls.isAssignableFrom(KafkaEventPublisher.class);
        });
        KafkaProperties.EventProcessorMode eventProcessorMode = kafkaProperties.getProducer().getEventProcessorMode();
        if (eventProcessorMode == KafkaProperties.EventProcessorMode.SUBSCRIBING) {
            eventProcessingConfigurer.registerSubscribingEventProcessor("__axon-kafka-event-publishing-group");
        } else if (eventProcessorMode == KafkaProperties.EventProcessorMode.TRACKING) {
            eventProcessingConfigurer.registerTrackingEventProcessor("__axon-kafka-event-publishing-group");
        } else {
            if (eventProcessorMode != KafkaProperties.EventProcessorMode.POOLED_STREAMING) {
                throw new AxonConfigurationException("Unknown Event Processor Mode [" + eventProcessorMode + "] detected");
            }
            eventProcessingConfigurer.registerPooledStreamingEventProcessor("__axon-kafka-event-publishing-group");
        }
        return build;
    }

    @ConditionalOnMissingBean
    @Bean({"axonKafkaConsumerFactory"})
    public ConsumerFactory<String, byte[]> kafkaConsumerFactory() {
        return new DefaultConsumerFactory(this.properties.buildConsumerProperties());
    }

    @ConditionalOnMissingBean
    @Bean(destroyMethod = "shutdown")
    public Fetcher<?, ?, ?> kafkaFetcher() {
        return AsyncFetcher.builder().pollTimeout(this.properties.getFetcher().getPollTimeout()).build();
    }

    @ConditionalOnMissingBean
    @ConditionalOnBean({ConsumerFactory.class, KafkaMessageConverter.class, Fetcher.class})
    @Conditional({StreamingProcessorModeCondition.class})
    @Bean
    public StreamableKafkaMessageSource<String, byte[]> streamableKafkaMessageSource(ConsumerFactory<String, byte[]> consumerFactory, Fetcher<String, byte[], KafkaEventMessage> fetcher, KafkaMessageConverter<String, byte[]> kafkaMessageConverter) {
        return StreamableKafkaMessageSource.builder().topics(Collections.singletonList(this.properties.getDefaultTopic())).consumerFactory(consumerFactory).fetcher(fetcher).messageConverter(kafkaMessageConverter).bufferFactory(() -> {
            return new SortedKafkaMessageBuffer(this.properties.getFetcher().getBufferSize());
        }).build();
    }
}
