package net.wessendorf.kafka.cdi.extension;

import java.lang.reflect.ParameterizedType;
import java.util.Arrays;
import java.util.Collections;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import javax.enterprise.context.spi.CreationalContext;
import javax.enterprise.event.Observes;
import javax.enterprise.inject.spi.AfterDeploymentValidation;
import javax.enterprise.inject.spi.AnnotatedMethod;
import javax.enterprise.inject.spi.AnnotatedType;
import javax.enterprise.inject.spi.BeanManager;
import javax.enterprise.inject.spi.Extension;
import javax.enterprise.inject.spi.InjectionPoint;
import javax.enterprise.inject.spi.InjectionTarget;
import javax.enterprise.inject.spi.ProcessAnnotatedType;
import javax.enterprise.inject.spi.ProcessInjectionTarget;
import javax.enterprise.inject.spi.WithAnnotations;
import javax.naming.InitialContext;
import javax.naming.NamingException;
import net.wessendorf.kafka.SimpleKafkaProducer;
import net.wessendorf.kafka.cdi.annotation.Consumer;
import net.wessendorf.kafka.cdi.annotation.KafkaConfig;
import net.wessendorf.kafka.cdi.annotation.Producer;
import net.wessendorf.kafka.impl.DelegationKafkaConsumer;
import net.wessendorf.kafka.impl.InjectedKafkaProducer;
import net.wessendorf.kafka.serialization.CafdiSerdes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:net/wessendorf/kafka/cdi/extension/KafkaExtension.class */
public class KafkaExtension<X> implements Extension {
    private String bootstrapServers = null;
    private final Set<AnnotatedMethod<?>> listenerMethods = Collections.newSetFromMap(new ConcurrentHashMap());
    private final Logger logger = LoggerFactory.getLogger(KafkaExtension.class);

    public void kafkaConfig(@Observes @WithAnnotations({KafkaConfig.class}) ProcessAnnotatedType processAnnotatedType) {
        this.logger.trace("Kafka config scanning type: " + processAnnotatedType.getAnnotatedType().getJavaClass().getName());
        KafkaConfig kafkaConfig = (KafkaConfig) processAnnotatedType.getAnnotatedType().getAnnotation(KafkaConfig.class);
        if (kafkaConfig == null || this.bootstrapServers != null) {
            return;
        }
        this.logger.info("setting bootstrap.servers IP");
        this.logger.trace("Got '{}'", kafkaConfig.bootstrapServers());
        this.bootstrapServers = VerySimpleEnvironmentResolver.simpleBootstrapServerResolver(kafkaConfig.bootstrapServers());
    }

    public void registerListeners(@Observes @WithAnnotations({Consumer.class}) ProcessAnnotatedType processAnnotatedType) {
        this.logger.trace("scanning type: " + processAnnotatedType.getAnnotatedType().getJavaClass().getName());
        for (AnnotatedMethod<?> annotatedMethod : processAnnotatedType.getAnnotatedType().getMethods()) {
            if (annotatedMethod.isAnnotationPresent(Consumer.class)) {
                this.logger.debug("found annotated listener method, adding for further processing");
                this.listenerMethods.add(annotatedMethod);
            }
        }
    }

    public void afterDeploymentValidation(@Observes AfterDeploymentValidation afterDeploymentValidation, BeanManager beanManager) {
        this.logger.debug("wiring annotated listener method to internal Kafka Consumer");
        this.listenerMethods.forEach(annotatedMethod -> {
            submitToExecutor(new DelegationKafkaConsumer(this.bootstrapServers, annotatedMethod, beanManager));
        });
    }

    public <X> void processInjectionTarget(@Observes ProcessInjectionTarget<X> processInjectionTarget) {
        final InjectionTarget injectionTarget = processInjectionTarget.getInjectionTarget();
        final AnnotatedType annotatedType = processInjectionTarget.getAnnotatedType();
        processInjectionTarget.setInjectionTarget(new InjectionTarget<X>() { // from class: net.wessendorf.kafka.cdi.extension.KafkaExtension.1
            public void inject(X x, CreationalContext<X> creationalContext) {
                injectionTarget.inject(x, creationalContext);
                Arrays.asList(annotatedType.getJavaClass().getDeclaredFields()).forEach(field -> {
                    Producer producer = (Producer) field.getAnnotation(Producer.class);
                    if (producer != null) {
                        String str = producer.topic();
                        if (field.getType().isAssignableFrom(SimpleKafkaProducer.class)) {
                            field.setAccessible(Boolean.TRUE.booleanValue());
                            try {
                                field.set(x, KafkaExtension.this.createInjectionProducer(KafkaExtension.this.bootstrapServers, str, CafdiSerdes.serdeFrom((Class) ((ParameterizedType) field.getGenericType()).getActualTypeArguments()[0]).serializer().getClass(), CafdiSerdes.serdeFrom((Class) ((ParameterizedType) field.getGenericType()).getActualTypeArguments()[1]).serializer().getClass()));
                            } catch (IllegalAccessException | IllegalArgumentException e) {
                                KafkaExtension.this.logger.error("could not inject producer", e);
                                e.printStackTrace();
                            }
                        }
                    }
                });
            }

            public void postConstruct(X x) {
                injectionTarget.postConstruct(x);
            }

            public void preDestroy(X x) {
                injectionTarget.dispose(x);
            }

            public void dispose(X x) {
                injectionTarget.dispose(x);
            }

            public Set<InjectionPoint> getInjectionPoints() {
                return injectionTarget.getInjectionPoints();
            }

            public X produce(CreationalContext<X> creationalContext) {
                return (X) injectionTarget.produce(creationalContext);
            }
        });
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v8, types: [java.util.concurrent.ExecutorService] */
    private void submitToExecutor(DelegationKafkaConsumer delegationKafkaConsumer) {
        ThreadPoolExecutor threadPoolExecutor;
        try {
            threadPoolExecutor = (ExecutorService) InitialContext.doLookup("java:comp/DefaultManagedExecutorService");
        } catch (NamingException e) {
            this.logger.warn("Could not find a managed ExecutorService, creating one manually");
            threadPoolExecutor = new ThreadPoolExecutor(16, 16, 10L, TimeUnit.MINUTES, new LinkedBlockingDeque());
        }
        delegationKafkaConsumer.initialize();
        threadPoolExecutor.submit(delegationKafkaConsumer);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public org.apache.kafka.clients.producer.Producer createInjectionProducer(String str, String str2, Class<?> cls, Class<?> cls2) {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", str);
        properties.put("key.serializer", cls);
        properties.put("value.serializer", cls2);
        return new InjectedKafkaProducer(properties, str2);
    }
}
