package net.wessendorf.kafka.impl;

import java.lang.annotation.Annotation;
import java.lang.reflect.InvocationTargetException;
import java.util.Properties;
import java.util.UUID;
import javax.enterprise.inject.spi.AnnotatedMethod;
import javax.enterprise.inject.spi.Bean;
import javax.enterprise.inject.spi.BeanManager;
import net.wessendorf.kafka.cdi.annotation.KafkaStream;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.KTable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:net/wessendorf/kafka/impl/DelegationStreamProcessor.class */
public class DelegationStreamProcessor {
    private final Logger logger = LoggerFactory.getLogger(DelegationStreamProcessor.class);
    final Properties properties = new Properties();
    private AnnotatedMethod annotatedProcessorMethod;
    private KafkaStreams streams;

    public void init(String str, AnnotatedMethod annotatedMethod, BeanManager beanManager) {
        this.annotatedProcessorMethod = annotatedMethod;
        KafkaStream kafkaStream = (KafkaStream) annotatedMethod.getAnnotation(KafkaStream.class);
        this.properties.put("application.id", "mw-cafdi-" + UUID.randomUUID().toString());
        this.properties.put("bootstrap.servers", str);
        this.properties.put("key.serde", Serdes.String().getClass());
        this.properties.put("value.serde", Serdes.String().getClass());
        this.properties.put("commit.interval.ms", 3000L);
        this.properties.put("auto.offset.reset", "earliest");
        StreamsConfig streamsConfig = new StreamsConfig(this.properties);
        KStreamBuilder kStreamBuilder = new KStreamBuilder();
        KStream stream = kStreamBuilder.stream(new String[]{kafkaStream.input()});
        Bean resolve = beanManager.resolve(beanManager.getBeans(this.annotatedProcessorMethod.getJavaMember().getDeclaringClass(), new Annotation[0]));
        try {
            Object invoke = this.annotatedProcessorMethod.getJavaMember().invoke(beanManager.getReference(resolve, this.annotatedProcessorMethod.getJavaMember().getDeclaringClass(), beanManager.createCreationalContext(resolve)), stream);
            if (invoke instanceof KStream) {
                ((KStream) invoke).through(Serdes.String(), Serdes.Long(), kafkaStream.output());
            } else if (invoke instanceof KTable) {
                ((KTable) invoke).to(Serdes.String(), Serdes.Long(), kafkaStream.output());
            }
        } catch (IllegalAccessException | InvocationTargetException e) {
            this.logger.error("error dispatching received value to consumer", e);
        }
        try {
            this.streams = new KafkaStreams(kStreamBuilder, streamsConfig);
            this.streams.setStateListener((state, state2) -> {
                this.logger.trace("OLD STATE {}", state2);
                this.logger.trace("NEW STATE {}", state);
            });
            this.logger.trace("Starting the Streaming context");
            this.streams.start();
        } catch (Exception e2) {
            this.logger.error("Could not start Kafka streaming client", e2);
        }
    }
}
