package com.weicoder.nosql.kafka;

import com.weicoder.common.concurrent.ScheduledUtil;
import com.weicoder.common.lang.Bytes;
import com.weicoder.common.lang.Conversion;
import com.weicoder.common.lang.Maps;
import com.weicoder.common.log.Logs;
import com.weicoder.common.params.CommonParams;
import com.weicoder.common.util.BeanUtil;
import com.weicoder.common.util.ClassUtil;
import com.weicoder.common.util.DateUtil;
import com.weicoder.common.util.EmptyUtil;
import com.weicoder.common.util.StringUtil;
import com.weicoder.nosql.kafka.annotation.Consumer;
import com.weicoder.nosql.kafka.annotation.Topic;
import com.weicoder.nosql.kafka.factory.KafkaFactory;
import java.lang.reflect.Method;
import java.lang.reflect.Parameter;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.ProducerRecord;

/* loaded from: input_file:com/weicoder/nosql/kafka/Kafkas.class */
public final class Kafkas {
    private static final Map<String, Object> CONSUMERS = Maps.newMap();
    private static final Map<String, Method> METHODS = Maps.newMap();
    private static final Map<String, KafkaConsumer<byte[], byte[]>> KAFKA_CONSUMERS = Maps.newMap();
    private static final Map<String, List<String>> TOPICS = Maps.newMap();

    public static void init() {
        List<Class> annotationClass = ClassUtil.getAnnotationClass(CommonParams.getPackages("kafka"), Consumer.class);
        if (EmptyUtil.isEmpty(annotationClass)) {
            return;
        }
        for (Class cls : annotationClass) {
            Object newInstance = BeanUtil.newInstance(cls);
            String value = ((Consumer) newInstance.getClass().getAnnotation(Consumer.class)).value();
            if (!KAFKA_CONSUMERS.containsKey(value)) {
                KAFKA_CONSUMERS.put(value, KafkaFactory.getConsumer(value));
            }
            List list = Maps.getList(TOPICS, value, String.class);
            for (Method method : cls.getMethods()) {
                Topic topic = (Topic) method.getAnnotation(Topic.class);
                if (topic != null) {
                    String value2 = topic.value();
                    METHODS.put(value2, method);
                    CONSUMERS.put(value2, newInstance);
                    list.add(value2);
                    Logs.info("add kafka Consumer={} topic={}", new Object[]{cls.getSimpleName(), value2});
                }
            }
        }
        Logs.info("add kafka Consumers size={}", new Object[]{Integer.valueOf(annotationClass.size())});
        for (String str : TOPICS.keySet()) {
            List<String> list2 = TOPICS.get(str);
            KAFKA_CONSUMERS.get(str).subscribe(list2);
            Logs.info("Kafkas init Consumer={} subscribe topic={}", new Object[]{str, list2});
        }
        for (KafkaConsumer<byte[], byte[]> kafkaConsumer : KAFKA_CONSUMERS.values()) {
            ScheduledUtil.delay(() -> {
                int time = DateUtil.getTime();
                int i = 0;
                Iterator it = kafkaConsumer.poll(1000L).iterator();
                while (it.hasNext()) {
                    ConsumerRecord consumerRecord = (ConsumerRecord) it.next();
                    Logs.debug("kafka read consumer record={}", new Object[]{consumerRecord});
                    String str2 = consumerRecord.topic();
                    Object obj = CONSUMERS.get(str2);
                    Method method2 = METHODS.get(str2);
                    Parameter[] parameters = method2.getParameters();
                    Object[] objArr = new Object[parameters.length];
                    if (EmptyUtil.isEmpty(parameters)) {
                        BeanUtil.invoke(obj, method2, new Object[0]);
                    } else {
                        if (parameters.length == 1) {
                            objArr[0] = toParam((byte[]) consumerRecord.value(), parameters[0].getType());
                        } else {
                            objArr[0] = toParam((byte[]) consumerRecord.key(), parameters[0].getType());
                            objArr[1] = toParam((byte[]) consumerRecord.value(), parameters[1].getType());
                        }
                        BeanUtil.invoke(obj, method2, objArr);
                    }
                    i++;
                    Logs.debug("kafka consumer method={} params={} args={}", new Object[]{method2.getName(), parameters, objArr});
                }
                if (i > 0) {
                    Logs.debug("kafka consumer end size={}  time={}", new Object[]{Integer.valueOf(i), Integer.valueOf(DateUtil.getTime() - time)});
                }
            }, 1);
        }
    }

    public static ProducerRecord<byte[], byte[]> newRecord(String str, Object obj) {
        return new ProducerRecord<>(str, toBytes(obj));
    }

    public static ProducerRecord<byte[], byte[]> newRecord(String str, Object obj, Object obj2) {
        return new ProducerRecord<>(str, toBytes(obj), toBytes(obj2));
    }

    private static Object toParam(byte[] bArr, Class<?> cls) {
        return String.class.equals(cls) ? StringUtil.toString(bArr) : Bytes.to(bArr, cls);
    }

    private static byte[] toBytes(Object obj) {
        return obj instanceof String ? Conversion.toString(obj).getBytes() : Bytes.toBytes(obj);
    }

    private Kafkas() {
    }
}
