package io.americanexpress.synapse.subscriber.kafka.config;

import io.americanexpress.synapse.subscriber.kafka.config.BaseKafkaPropertiesConfiguration;
import io.americanexpress.synapse.subscriber.kafka.errorhandler.BaseKafkaSubscriberErrorHandler;
import io.americanexpress.synapse.subscriber.kafka.filter.BaseKafkaSubscriberMessageFilter;
import io.americanexpress.synapse.subscriber.kafka.interceptor.BaseKafkaSubscriberMetricInterceptor;
import java.util.Optional;
import org.springframework.context.annotation.Bean;
import org.springframework.core.env.Environment;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.CommonErrorHandler;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.listener.adapter.RecordFilterStrategy;

@EnableKafka
/* loaded from: input_file:io/americanexpress/synapse/subscriber/kafka/config/BaseKafkaSubscriberConfiguration.class */
public abstract class BaseKafkaSubscriberConfiguration<C extends BaseKafkaPropertiesConfiguration, E extends BaseKafkaSubscriberErrorHandler, F extends BaseKafkaSubscriberMessageFilter, I extends BaseKafkaSubscriberMetricInterceptor> {
    private static final String partitionCountKey = "kafka.partitions.count";
    private static final String recordFilterEnabledKey = "kafka.subscriber.filter.enabled";
    private static final String batchSubscriberEnabledKey = "kafka.subscriber.batch.enabled";
    private final C kafkaPropertiesConfiguration;
    private final I recordInterceptor;
    private final E kafkaErrorHandler;
    private F recordFilteringStrategy;
    private final Integer partitions;
    private final boolean recordFilteringEnabled;
    private final boolean batchSubscriberEnabled;

    public boolean isRecordFilteringEnabled() {
        return this.recordFilteringEnabled;
    }

    public boolean isBatchSubscriberEnabled() {
        return this.batchSubscriberEnabled;
    }

    protected BaseKafkaSubscriberConfiguration(C c, E e, F f, Environment environment, I i) {
        this(c, e, environment, i);
        this.recordFilteringStrategy = f;
    }

    protected BaseKafkaSubscriberConfiguration(C c, E e, Environment environment, I i) {
        this.kafkaPropertiesConfiguration = c;
        this.kafkaErrorHandler = e;
        this.recordInterceptor = i;
        this.partitions = (Integer) Optional.ofNullable((Integer) environment.getProperty(partitionCountKey, Integer.class)).orElse(1);
        this.batchSubscriberEnabled = ((Boolean) Optional.ofNullable((Boolean) environment.getProperty(batchSubscriberEnabledKey, Boolean.class)).orElse(Boolean.FALSE)).booleanValue();
        this.recordFilteringEnabled = ((Boolean) Optional.ofNullable((Boolean) environment.getProperty(recordFilterEnabledKey, Boolean.class)).orElse(Boolean.FALSE)).booleanValue();
        this.recordFilteringStrategy = null;
    }

    private ConsumerFactory<String, Object> consumerFactory() {
        return new DefaultKafkaConsumerFactory(this.kafkaPropertiesConfiguration.buildConsumerProperties());
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, Object> baseKafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, Object> concurrentKafkaListenerContainerFactory = new ConcurrentKafkaListenerContainerFactory<>();
        concurrentKafkaListenerContainerFactory.setConcurrency(this.partitions);
        concurrentKafkaListenerContainerFactory.setConsumerFactory(consumerFactory());
        if (isRecordFilteringEnabled()) {
            concurrentKafkaListenerContainerFactory.setAckDiscarded(true);
            concurrentKafkaListenerContainerFactory.setRecordFilterStrategy((RecordFilterStrategy) Optional.ofNullable(this.recordFilteringStrategy).orElseThrow(() -> {
                return new IllegalArgumentException("Please consider defining a bean of type: " + String.valueOf(BaseKafkaSubscriberMessageFilter.class));
            }));
        }
        if (!((Boolean) Optional.ofNullable(this.kafkaPropertiesConfiguration.getConsumer().getEnableAutoCommit()).orElse(Boolean.FALSE)).booleanValue()) {
            concurrentKafkaListenerContainerFactory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
        }
        if (isBatchSubscriberEnabled()) {
            concurrentKafkaListenerContainerFactory.setBatchListener(Boolean.valueOf(this.batchSubscriberEnabled));
            concurrentKafkaListenerContainerFactory.setBatchInterceptor(this.recordInterceptor);
        }
        concurrentKafkaListenerContainerFactory.setCommonErrorHandler((CommonErrorHandler) Optional.ofNullable(this.kafkaErrorHandler).orElseThrow(() -> {
            return new IllegalArgumentException("Please consider defining a bean of type: " + String.valueOf(BaseKafkaSubscriberErrorHandler.class));
        }));
        concurrentKafkaListenerContainerFactory.setRecordInterceptor(this.recordInterceptor);
        return concurrentKafkaListenerContainerFactory;
    }
}
