package net.wessendorf.kafka.impl;

import java.lang.annotation.Annotation;
import java.lang.reflect.InvocationTargetException;
import java.util.Arrays;
import java.util.Iterator;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.enterprise.inject.spi.AnnotatedMethod;
import javax.enterprise.inject.spi.Bean;
import javax.enterprise.inject.spi.BeanManager;
import net.wessendorf.kafka.cdi.annotation.Consumer;
import net.wessendorf.kafka.serialization.CafdiSerdes;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.errors.WakeupException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:net/wessendorf/kafka/impl/DelegationKafkaConsumer.class */
public class DelegationKafkaConsumer implements Runnable {
    private Object consumerInstance;
    private KafkaConsumer<?, ?> consumer;
    private String topic;
    private AnnotatedMethod annotatedListenerMethod;
    private final Logger logger = LoggerFactory.getLogger(DelegationKafkaConsumer.class);
    private final AtomicBoolean running = new AtomicBoolean(Boolean.TRUE.booleanValue());
    final Properties properties = new Properties();

    private Class<?> consumerKeyType(Class<?> cls, AnnotatedMethod annotatedMethod) {
        return annotatedMethod.getJavaMember().getParameterTypes().length == 2 ? annotatedMethod.getJavaMember().getParameterTypes()[0] : cls;
    }

    private Class<?> consumerValueType(AnnotatedMethod annotatedMethod) {
        return annotatedMethod.getJavaMember().getParameterTypes().length == 2 ? annotatedMethod.getJavaMember().getParameterTypes()[1] : annotatedMethod.getJavaMember().getParameterTypes()[0];
    }

    private <K, V> void createKafkaConsumer(Class<K> cls, Class<V> cls2, Properties properties) {
        this.consumer = new KafkaConsumer<>(properties, CafdiSerdes.serdeFrom(cls).deserializer(), CafdiSerdes.serdeFrom(cls2).deserializer());
    }

    public void initialize(String str, AnnotatedMethod annotatedMethod, BeanManager beanManager) {
        Consumer consumer = (Consumer) annotatedMethod.getAnnotation(Consumer.class);
        this.topic = consumer.topic();
        String groupId = consumer.groupId();
        Class<?> keyType = consumer.keyType();
        this.annotatedListenerMethod = annotatedMethod;
        Class<?> consumerKeyType = consumerKeyType(keyType, annotatedMethod);
        Class<?> consumerValueType = consumerValueType(annotatedMethod);
        this.properties.put("bootstrap.servers", str);
        this.properties.put("group.id", groupId);
        this.properties.put("key.deserializer", CafdiSerdes.serdeFrom(consumerKeyType).deserializer().getClass());
        this.properties.put("value.deserializer", CafdiSerdes.serdeFrom(consumerValueType).deserializer().getClass());
        createKafkaConsumer(consumerKeyType, consumerValueType, this.properties);
        Bean resolve = beanManager.resolve(beanManager.getBeans(this.annotatedListenerMethod.getJavaMember().getDeclaringClass(), new Annotation[0]));
        this.consumerInstance = beanManager.getReference(resolve, this.annotatedListenerMethod.getJavaMember().getDeclaringClass(), beanManager.createCreationalContext(resolve));
    }

    @Override // java.lang.Runnable
    public void run() {
        try {
            try {
                try {
                    this.consumer.subscribe(Arrays.asList(this.topic));
                    this.logger.trace("subscribed to {}", this.topic);
                    while (isRunning()) {
                        Iterator it = this.consumer.poll(100L).iterator();
                        while (it.hasNext()) {
                            ConsumerRecord consumerRecord = (ConsumerRecord) it.next();
                            try {
                                this.logger.trace("dispatching payload {} to consumer", consumerRecord.value());
                                if (this.annotatedListenerMethod.getJavaMember().getParameterTypes().length == 2) {
                                    this.annotatedListenerMethod.getJavaMember().invoke(this.consumerInstance, consumerRecord.key(), consumerRecord.value());
                                } else {
                                    this.annotatedListenerMethod.getJavaMember().invoke(this.consumerInstance, consumerRecord.value());
                                }
                                this.logger.trace("dispatched payload {} to consumer", consumerRecord.value());
                            } catch (IllegalAccessException | InvocationTargetException e) {
                                this.logger.error("error dispatching received value to consumer", e);
                            }
                        }
                    }
                    this.logger.info("Close the consumer.");
                    this.consumer.close();
                } catch (WakeupException e2) {
                    if (isRunning()) {
                        this.logger.trace("Exception", e2);
                        throw e2;
                    }
                    this.logger.info("Close the consumer.");
                    this.consumer.close();
                }
            } catch (SerializationException e3) {
                this.logger.warn("Consumer exception", e3);
                throw e3;
            }
        } catch (Throwable th) {
            this.logger.info("Close the consumer.");
            this.consumer.close();
            throw th;
        }
    }

    public boolean isRunning() {
        return this.running.get();
    }

    public void shutdown() {
        this.logger.info("Shutting down the consumer.");
        this.running.set(Boolean.FALSE.booleanValue());
        this.consumer.wakeup();
    }
}
