package com.weicoder.kafka.init;

import com.weicoder.common.U;
import com.weicoder.common.W;
import com.weicoder.common.concurrent.ScheduledUtil;
import com.weicoder.common.init.Init;
import com.weicoder.common.lang.Bytes;
import com.weicoder.common.lang.Lists;
import com.weicoder.common.lang.Maps;
import com.weicoder.common.log.Log;
import com.weicoder.common.log.LogFactory;
import com.weicoder.common.util.BeanUtil;
import com.weicoder.common.util.ClassUtil;
import com.weicoder.common.util.StringUtil;
import com.weicoder.json.JsonEngine;
import com.weicoder.kafka.annotation.AllTopic;
import com.weicoder.kafka.annotation.Consumer;
import com.weicoder.kafka.annotation.Topic;
import com.weicoder.kafka.consumer.Record;
import com.weicoder.kafka.factory.KafkaFactory;
import com.weicoder.kafka.params.KafkaParams;
import com.weicoder.protobuf.Protobuf;
import com.weicoder.protobuf.ProtobufEngine;
import java.lang.reflect.Method;
import java.lang.reflect.Parameter;
import java.time.Duration;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;

/* loaded from: input_file:com/weicoder/kafka/init/KafkaInit.class */
public class KafkaInit implements Init {
    private Log LOG = LogFactory.getLog(KafkaInit.class);
    private Map<String, Object> CONSUMERS = Maps.newMap();
    private Map<String, Method> METHODS = Maps.newMap();
    private Map<String, List<Method>> ALL_TOPICS = Maps.newMap();
    private Map<String, KafkaConsumer<byte[], byte[]>> KAFKA_CONSUMERS = Maps.newMap();
    private Map<String, List<String>> TOPICS = Maps.newMap();
    private Map<String, Map<String, Queue<ConsumerRecord<byte[], byte[]>>>> TOPIC_RECORDS = Maps.newConcurrentMap();

    public void init() {
        List list = U.C.list(Consumer.class);
        if (U.E.isNotEmpty(list)) {
            list.forEach(cls -> {
                Object newInstance = ClassUtil.newInstance(cls, new Class[0]);
                String value = ((Consumer) newInstance.getClass().getAnnotation(Consumer.class)).value();
                Map<String, Queue<ConsumerRecord<byte[], byte[]>>> map = this.TOPIC_RECORDS.get(value);
                if (!this.KAFKA_CONSUMERS.containsKey(value)) {
                    this.KAFKA_CONSUMERS.put(value, KafkaFactory.getConsumer(value));
                    Map<String, Map<String, Queue<ConsumerRecord<byte[], byte[]>>>> map2 = this.TOPIC_RECORDS;
                    ConcurrentMap newConcurrentMap = Maps.newConcurrentMap();
                    map = newConcurrentMap;
                    map2.put(value, newConcurrentMap);
                }
                List list2 = Maps.getList(this.TOPICS, value);
                for (Method method : cls.getDeclaredMethods()) {
                    Topic topic = (Topic) method.getAnnotation(Topic.class);
                    if (topic != null) {
                        String value2 = topic.value();
                        this.METHODS.put(value2, method);
                        this.CONSUMERS.put(value2, newInstance);
                        list2.add(value2);
                        map.put(value2, new ConcurrentLinkedQueue());
                        this.LOG.info("add kafka consumer={} topic={}", new Object[]{cls.getSimpleName(), value2});
                    } else if (((AllTopic) method.getAnnotation(AllTopic.class)) != null) {
                        List<Method> list3 = this.ALL_TOPICS.get(value);
                        if (list3 == null) {
                            Map<String, List<Method>> map3 = this.ALL_TOPICS;
                            List<Method> newList = Lists.newList();
                            list3 = newList;
                            map3.put(value, newList);
                        }
                        list3.add(method);
                    }
                }
            });
            this.LOG.info("add kafka Consumers size={}", new Object[]{Integer.valueOf(list.size())});
            this.TOPICS.keySet().forEach(str -> {
                List<String> list2 = this.TOPICS.get(str);
                this.KAFKA_CONSUMERS.get(str).subscribe(list2);
                this.LOG.info("Kafkas init Consumer={} subscribe topic={}", new Object[]{str, list2});
            });
            ScheduledUtil.newDelay(() -> {
                this.KAFKA_CONSUMERS.forEach((str2, kafkaConsumer) -> {
                    long id = Thread.currentThread().getId();
                    long currentTimeMillis = System.currentTimeMillis();
                    int i = 0;
                    Map<String, Queue<ConsumerRecord<byte[], byte[]>>> map = this.TOPIC_RECORDS.get(str2);
                    Iterator it = kafkaConsumer.poll(Duration.ofSeconds(1L)).iterator();
                    while (it.hasNext()) {
                        ConsumerRecord<byte[], byte[]> consumerRecord = (ConsumerRecord) it.next();
                        this.LOG.debug("kafka read consumer thread={} record={}", new Object[]{Long.valueOf(id), consumerRecord});
                        map.get(consumerRecord.topic()).add(consumerRecord);
                        i++;
                    }
                    if (i > 0) {
                        this.LOG.info("kafka read consumer end name={} size={} time={} thread={}", new Object[]{str2, Integer.valueOf(i), Long.valueOf(System.currentTimeMillis() - currentTimeMillis), Long.valueOf(id)});
                    }
                });
            }, 0L, 10L);
            this.TOPIC_RECORDS.forEach((str2, map) -> {
                map.values().forEach(queue -> {
                    ScheduledUtil.delay(KafkaParams.PREFIX, () -> {
                        long id = Thread.currentThread().getId();
                        long currentTimeMillis = System.currentTimeMillis();
                        int i = 0;
                        String str2 = null;
                        long j = 0;
                        while (true) {
                            ConsumerRecord consumerRecord = (ConsumerRecord) queue.poll();
                            if (consumerRecord == null) {
                                break;
                            }
                            str2 = consumerRecord.topic();
                            j = consumerRecord.offset();
                            Object obj = this.CONSUMERS.get(str2);
                            Method method = this.METHODS.get(str2);
                            Parameter[] parameters = method.getParameters();
                            Object[] objArr = null;
                            if (U.E.isEmpty(parameters)) {
                                BeanUtil.invoke(obj, method, new Object[0]);
                            } else {
                                objArr = new Object[parameters.length];
                                Parameter parameter = parameters[0];
                                Class<?> type = parameter.getType();
                                if (parameters.length != 1) {
                                    objArr[0] = toParam((byte[]) consumerRecord.key(), type);
                                    objArr[1] = toParam((byte[]) consumerRecord.value(), parameters[1].getType());
                                } else if (ConsumerRecord.class.equals(type)) {
                                    objArr[0] = consumerRecord;
                                } else if (Record.class.equals(type)) {
                                    Class[] genericClass = ClassUtil.getGenericClass(parameter.getParameterizedType());
                                    objArr[0] = new Record(consumerRecord.topic(), toParam((byte[]) consumerRecord.key(), genericClass[0]), toParam((byte[]) consumerRecord.value(), genericClass[1]), consumerRecord.offset(), consumerRecord.timestamp());
                                    List<Method> list2 = this.ALL_TOPICS.get(str2);
                                    if (U.E.isEmpty(list2)) {
                                        Iterator<Method> it = list2.iterator();
                                        while (it.hasNext()) {
                                            BeanUtil.invoke(obj, it.next(), objArr);
                                        }
                                    }
                                } else {
                                    objArr[0] = toParam((byte[]) consumerRecord.value(), type);
                                }
                                BeanUtil.invoke(obj, method, objArr);
                            }
                            this.LOG.debug("kafka consumer topic={} offset={} method={} args={} params={} thread={}", new Object[]{str2, Long.valueOf(j), method.getName(), objArr, parameters, Long.valueOf(id)});
                            i++;
                        }
                        if (i > 0) {
                            this.LOG.info("kafka consumer end topic={} offset={} size={} time={} thread={}", new Object[]{str2, Long.valueOf(j), Integer.valueOf(i), Long.valueOf(System.currentTimeMillis() - currentTimeMillis), Long.valueOf(id)});
                        }
                    }, 10L);
                });
            });
        }
    }

    private static Object toParam(byte[] bArr, Class<?> cls) {
        return String.class.equals(cls) ? U.S.toString(bArr) : Map.class.equals(cls) ? JsonEngine.toMap(StringUtil.toString(bArr)) : List.class.equals(cls) ? JsonEngine.toList(StringUtil.toString(bArr)) : cls.isAnnotationPresent(Protobuf.class) ? ProtobufEngine.toBean(bArr, cls) : W.B.isType(cls) ? Bytes.to(bArr, cls) : JsonEngine.toBean(U.S.toString(bArr), cls);
    }
}
