package io.xocore.kafka;

import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.codehaus.jackson.JsonNode;
import org.codehaus.jackson.map.ObjectMapper;
import org.codehaus.jackson.node.ArrayNode;
import org.codehaus.jackson.node.ObjectNode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/xocore/kafka/Consumer.class */
public class Consumer {
    private Logger logger;
    private ObjectMapper mapper;
    private SimpleDateFormat dateFormatter;
    private boolean stopped;
    private String serviceName;
    private String serverOrigin;
    private String groupId;
    private static final String keyDeserializer = "org.apache.kafka.common.serialization.StringDeserializer";
    private static final String valueDeserializer = "org.apache.kafka.common.serialization.StringDeserializer";
    private String deadLetterTopic;
    private int pollTimeout;
    private boolean autoCommitEnable;
    private int autoCommitInterval;
    private int consumerRetries;
    private Producer producer;
    private static Consumer instance = null;
    private KafkaConsumer<String, String> kafkaConsumer;
    private HashMap<String, ConsumerHandler> consumerHandlers;

    public static Consumer getInstance(String str, String str2, String str3) {
        if (instance == null) {
            instance = new Consumer(str, str2, str3);
        }
        return instance;
    }

    public static Consumer getInstance(String str, String str2, boolean z, int i, int i2, String str3, String str4, int i3) {
        if (instance == null) {
            instance = new Consumer(str, str2, z, i, i2, str3, str4, i3);
        }
        return instance;
    }

    private Consumer(String str, String str2, String str3) {
        this.logger = LoggerFactory.getLogger(getClass());
        this.mapper = new ObjectMapper();
        this.dateFormatter = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSXXX");
        this.stopped = false;
        this.deadLetterTopic = "xoc.dead-letter-queue";
        this.pollTimeout = 3000;
        this.autoCommitEnable = true;
        this.autoCommitInterval = 1000;
        this.consumerRetries = 0;
        this.consumerHandlers = new HashMap<>();
        this.serverOrigin = str;
        this.groupId = str2;
        this.serviceName = str3;
        this.producer = Producer.getInstance(str, str3);
    }

    private Consumer(String str, String str2, boolean z, int i, int i2, String str3, String str4, int i3) {
        this.logger = LoggerFactory.getLogger(getClass());
        this.mapper = new ObjectMapper();
        this.dateFormatter = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSXXX");
        this.stopped = false;
        this.deadLetterTopic = "xoc.dead-letter-queue";
        this.pollTimeout = 3000;
        this.autoCommitEnable = true;
        this.autoCommitInterval = 1000;
        this.consumerRetries = 0;
        this.consumerHandlers = new HashMap<>();
        this.serverOrigin = str;
        this.groupId = str2;
        this.autoCommitEnable = z;
        this.autoCommitInterval = i;
        this.pollTimeout = i2;
        this.deadLetterTopic = str3;
        this.serviceName = str4;
        this.consumerRetries = i3;
        this.producer = Producer.getInstance(str, str4);
    }

    private KafkaConsumer getKafkaConsumer() {
        if (this.kafkaConsumer == null) {
            Properties properties = new Properties();
            properties.put("bootstrap.servers", this.serverOrigin);
            properties.put("group.id", this.groupId);
            properties.put("enable.auto.commit", Boolean.valueOf(this.autoCommitEnable));
            properties.put("auto.commit.interval.ms", Integer.valueOf(this.autoCommitInterval));
            properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            this.kafkaConsumer = new KafkaConsumer<>(properties);
        }
        return this.kafkaConsumer;
    }

    private ObjectNode createErrorNode(Exception exc) {
        ObjectNode createObjectNode = this.mapper.createObjectNode();
        createObjectNode.put("exception", Arrays.toString(exc.getStackTrace()));
        createObjectNode.put("@timestamp", this.dateFormatter.format(Long.valueOf(new Date().getTime())));
        return createObjectNode;
    }

    private ObjectNode createRetry(Exception exc, String str) {
        ArrayNode createArrayNode = this.mapper.createArrayNode();
        createArrayNode.add(createErrorNode(exc));
        ObjectNode createObjectNode = this.mapper.createObjectNode();
        createObjectNode.put("service", this.serviceName);
        createObjectNode.put("topic", str);
        createObjectNode.put("errors", createArrayNode);
        return createObjectNode;
    }

    private void handleConsumerException(Exception exc, ConsumerRecord<String, String> consumerRecord, String str) {
        ObjectNode createObjectNode = this.mapper.createObjectNode();
        createObjectNode.put("message", (String) consumerRecord.value());
        createObjectNode.put("retry", createRetry(exc, consumerRecord.topic()));
        this.producer.produce(str, createObjectNode.toString());
    }

    private void handleConsumerException(Exception exc, ObjectNode objectNode, String str, String str2) {
        if (!objectNode.has("retry")) {
            ObjectNode createObjectNode = this.mapper.createObjectNode();
            createObjectNode.put("message", objectNode);
            createObjectNode.put("retry", createRetry(exc, str2));
            this.producer.produce(str, createObjectNode.toString());
            return;
        }
        ArrayNode arrayNode = objectNode.get("retry").get("errors");
        if (arrayNode.size() > this.consumerRetries) {
            this.producer.produce(this.deadLetterTopic, objectNode.toString());
        } else {
            arrayNode.add(createErrorNode(exc));
            this.producer.produce(str, objectNode.toString());
        }
    }

    public void addConsumerHandler(String str, ConsumerHandler consumerHandler) {
        this.consumerHandlers.put(str, consumerHandler);
    }

    public void stop() {
        this.stopped = true;
    }

    public void consume() throws Exception {
        if (this.consumerHandlers.isEmpty()) {
            this.logger.error("There are registered topics.");
            throw new Exception("There are no registered topics.");
        }
        getKafkaConsumer().subscribe(new ArrayList(this.consumerHandlers.keySet()));
        while (!this.stopped) {
            Iterator it = getKafkaConsumer().poll(this.pollTimeout).iterator();
            while (it.hasNext()) {
                ConsumerRecord<String, String> consumerRecord = (ConsumerRecord) it.next();
                this.logger.info("topic = {}, offset = {}, key = {}, value = {}", new Object[]{consumerRecord.topic(), Long.valueOf(consumerRecord.offset()), consumerRecord.key(), consumerRecord.value()});
                String str = consumerRecord.topic() + "-retry";
                try {
                    JsonNode readTree = this.mapper.readTree((String) consumerRecord.value());
                    try {
                        String str2 = consumerRecord.topic();
                        JsonNode jsonNode = readTree;
                        if (readTree.has("message")) {
                            jsonNode = readTree.get("message");
                            consumerRecord.topic();
                            str2 = readTree.get("retry").get("topic").asText();
                        }
                        this.consumerHandlers.get(str2).run(jsonNode);
                    } catch (Exception e) {
                        e.printStackTrace();
                        if (this.consumerRetries > 0) {
                            handleConsumerException(e, (ObjectNode) readTree, str, consumerRecord.topic());
                        }
                    }
                } catch (IOException e2) {
                    handleConsumerException(e2, consumerRecord, this.deadLetterTopic);
                }
            }
        }
    }
}
