package net.wessendorf.kafka.impl;

import java.lang.annotation.Annotation;
import java.lang.reflect.InvocationTargetException;
import java.util.Arrays;
import java.util.Iterator;
import java.util.Properties;
import javax.enterprise.inject.spi.AnnotatedMethod;
import javax.enterprise.inject.spi.Bean;
import javax.enterprise.inject.spi.BeanManager;
import net.wessendorf.kafka.cdi.annotation.Consumer;
import net.wessendorf.kafka.serialization.CafdiSerdes;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:net/wessendorf/kafka/impl/DelegationKafkaConsumer.class */
public class DelegationKafkaConsumer implements Runnable {
    private Object consumerInstance;
    private final KafkaConsumer<String, String> consumer;
    private final String topic;
    private final AnnotatedMethod annotatedListenerMethod;
    private final BeanManager beanManager;
    private final Logger logger = LoggerFactory.getLogger(DelegationKafkaConsumer.class);
    final Properties properties = new Properties();

    public DelegationKafkaConsumer(String str, AnnotatedMethod annotatedMethod, BeanManager beanManager) {
        Consumer consumer = (Consumer) annotatedMethod.getAnnotation(Consumer.class);
        this.topic = consumer.topic();
        String groupId = consumer.groupId();
        Class<?> keyType = consumer.keyType();
        this.annotatedListenerMethod = annotatedMethod;
        this.beanManager = beanManager;
        this.properties.put("bootstrap.servers", str);
        this.properties.put("group.id", groupId);
        this.properties.put("key.deserializer", CafdiSerdes.serdeFrom(keyType(keyType, annotatedMethod)).deserializer().getClass());
        this.properties.put("value.deserializer", CafdiSerdes.serdeFrom(valueType(annotatedMethod)).deserializer().getClass());
        this.consumer = new KafkaConsumer<>(this.properties);
    }

    private Class<?> keyType(Class<?> cls, AnnotatedMethod annotatedMethod) {
        return annotatedMethod.getJavaMember().getParameterTypes().length == 2 ? annotatedMethod.getJavaMember().getParameterTypes()[0] : cls;
    }

    private Class<?> valueType(AnnotatedMethod annotatedMethod) {
        return annotatedMethod.getJavaMember().getParameterTypes().length == 2 ? annotatedMethod.getJavaMember().getParameterTypes()[1] : annotatedMethod.getJavaMember().getParameterTypes()[0];
    }

    public void initialize() {
        Bean resolve = this.beanManager.resolve(this.beanManager.getBeans(this.annotatedListenerMethod.getJavaMember().getDeclaringClass(), new Annotation[0]));
        this.consumerInstance = this.beanManager.getReference(resolve, this.annotatedListenerMethod.getJavaMember().getDeclaringClass(), this.beanManager.createCreationalContext(resolve));
    }

    @Override // java.lang.Runnable
    public void run() {
        this.consumer.subscribe(Arrays.asList(this.topic));
        this.logger.debug("subscribed to {}", this.topic);
        while (1 != 0) {
            Iterator it = this.consumer.poll(100L).iterator();
            while (it.hasNext()) {
                ConsumerRecord consumerRecord = (ConsumerRecord) it.next();
                try {
                    this.logger.trace("dispatching payload {} to consumer", consumerRecord.value());
                    if (this.annotatedListenerMethod.getJavaMember().getParameterTypes().length == 2) {
                        this.annotatedListenerMethod.getJavaMember().invoke(this.consumerInstance, consumerRecord.key(), consumerRecord.value());
                    } else {
                        this.annotatedListenerMethod.getJavaMember().invoke(this.consumerInstance, consumerRecord.value());
                    }
                    this.logger.trace("dispatched payload {} to consumer", consumerRecord.value());
                } catch (IllegalAccessException | InvocationTargetException e) {
                    this.logger.error("error dispatching received value to consumer", e);
                }
            }
        }
    }
}
