package net.lightapi.portal.user.command.handler;

import com.networknt.config.Config;
import com.networknt.config.JsonMapper;
import com.networknt.httpstring.AttachmentConstants;
import com.networknt.kafka.common.KafkaConsumerConfig;
import com.networknt.rpc.HybridHandler;
import com.networknt.rpc.router.ServiceHandler;
import com.networknt.utility.NioUtils;
import io.undertow.server.HttpServerExchange;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.time.OffsetDateTime;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.stream.Collectors;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ServiceHandler(id = "lightapi.net/user/exportPortalEvent/0.1.0")
/* loaded from: input_file:net/lightapi/portal/user/command/handler/ExportPortalEvent.class */
public class ExportPortalEvent implements HybridHandler {
    protected static final String INCORRECT_TOKEN_TYPE = "ERR11601";
    protected static final String KAFKA_ACCESS_ERROR = "ERR11643";
    protected static final String INVALID_VARIABLE_FORMAT = "ERR11644";
    private static final Logger logger = LoggerFactory.getLogger(ExportPortalEvent.class);
    public static final KafkaConsumerConfig config = (KafkaConsumerConfig) Config.getInstance().getJsonObjectConfig("kafka-consumer", KafkaConsumerConfig.class);
    private static final Class<?> keyDeserializer = ByteArrayDeserializer.class;
    private static final Class<?> valueDeserializer = ByteArrayDeserializer.class;

    /* loaded from: input_file:net/lightapi/portal/user/command/handler/ExportPortalEvent$ExportConsumerRebalanceListener.class */
    private static class ExportConsumerRebalanceListener implements ConsumerRebalanceListener {
        private ExportConsumerRebalanceListener() {
        }

        public void onPartitionsRevoked(Collection<TopicPartition> collection) {
            System.out.println("Called onPartitionsRevoked with partitions:" + String.valueOf(collection));
        }

        public void onPartitionsAssigned(Collection<TopicPartition> collection) {
            System.out.println("Called onPartitionsAssigned with partitions:" + String.valueOf(collection));
        }
    }

    public ByteBuffer handle(HttpServerExchange httpServerExchange, Object obj) {
        Map map = (Map) obj;
        String str = (String) map.get("hostId");
        String str2 = (String) map.get("startTs");
        String str3 = (String) map.get("endTs");
        String str4 = (String) map.get("portalServices");
        if (logger.isTraceEnabled()) {
            logger.trace("hostId = {}, startTs = {}, endTs = {}, portalServices = {}", new Object[]{str, str2, str3, str4});
        }
        String str5 = (String) ((Map) httpServerExchange.getAttachment(AttachmentConstants.AUDIT_INFO)).get("user_id");
        if (str5 == null) {
            logger.error("Incorrect token type: userId is null. Must be Authorization Code Token.");
            return NioUtils.toByteBuffer(getStatus(httpServerExchange, INCORRECT_TOKEN_TYPE, new Object[]{"Authorization Code Token"}));
        }
        try {
            long epochMilli = OffsetDateTime.parse(str2).toInstant().toEpochMilli();
            long epochMilli2 = OffsetDateTime.parse(str3).toInstant().toEpochMilli();
            Properties properties = new Properties();
            properties.putAll(config.getProperties());
            properties.put("key.deserializer", keyDeserializer);
            properties.put("value.deserializer", valueDeserializer);
            properties.put("group.id", "export-group-" + str5 + "-" + System.currentTimeMillis());
            properties.put("enable.auto.commit", "false");
            properties.put("auto.offset.reset", "none");
            StringBuilder sb = new StringBuilder();
            KafkaConsumer kafkaConsumer = null;
            if (logger.isTraceEnabled()) {
                logger.trace("props = {}", JsonMapper.toJson(properties));
            }
            try {
                try {
                    KafkaConsumer kafkaConsumer2 = new KafkaConsumer(properties);
                    if (logger.isTraceEnabled()) {
                        logger.trace("Kafka consumer created.");
                    }
                    List partitionsFor = kafkaConsumer2.partitionsFor(config.getTopic());
                    if (partitionsFor == null || partitionsFor.isEmpty()) {
                        logger.warn("No partitions found for topic: {}", config.getTopic());
                        ByteBuffer byteBuffer = NioUtils.toByteBuffer("");
                        if (kafkaConsumer2 != null) {
                            kafkaConsumer2.close();
                            logger.debug("Kafka consumer closed.");
                        }
                        return byteBuffer;
                    }
                    if (logger.isTraceEnabled()) {
                        logger.trace("Partitions for topic {}: {}", config.getTopic(), partitionsFor);
                    }
                    List list = (List) partitionsFor.stream().map(partitionInfo -> {
                        return new TopicPartition(config.getTopic(), partitionInfo.partition());
                    }).collect(Collectors.toList());
                    if (logger.isTraceEnabled()) {
                        logger.trace("Topic partitions: {}", list);
                    }
                    Map offsetsForTimes = kafkaConsumer2.offsetsForTimes((Map) list.stream().collect(Collectors.toMap(topicPartition -> {
                        return topicPartition;
                    }, topicPartition2 -> {
                        return Long.valueOf(epochMilli);
                    })));
                    if (logger.isTraceEnabled()) {
                        logger.trace("Starting offsets for timestamps: {}", offsetsForTimes);
                    }
                    kafkaConsumer2.assign(list);
                    if (logger.isTraceEnabled()) {
                        logger.trace("Assigned partitions: {}", list);
                    }
                    offsetsForTimes.forEach((topicPartition3, offsetAndTimestamp) -> {
                        if (offsetAndTimestamp != null) {
                            logger.debug("Seeking partition {} to offset {}", Integer.valueOf(topicPartition3.partition()), Long.valueOf(offsetAndTimestamp.offset()));
                            kafkaConsumer2.seek(topicPartition3, offsetAndTimestamp.offset());
                        } else {
                            logger.debug("No offset found for partition {} >= startTs {}, seeking to end.", Integer.valueOf(topicPartition3.partition()), str2);
                            kafkaConsumer2.seekToEnd(Collections.singletonList(topicPartition3));
                        }
                    });
                    int i = 0;
                    boolean z = true;
                    while (z) {
                        ConsumerRecords poll = kafkaConsumer2.poll(Duration.ofMillis(1000L));
                        if (poll.isEmpty()) {
                            i++;
                            if (i >= 5) {
                                logger.info("No more records found after {} polls, stopping.", 5);
                                z = false;
                            }
                        } else {
                            i = 0;
                            for (TopicPartition topicPartition4 : poll.partitions()) {
                                Iterator it = poll.records(topicPartition4).iterator();
                                while (true) {
                                    if (it.hasNext()) {
                                        ConsumerRecord consumerRecord = (ConsumerRecord) it.next();
                                        if (consumerRecord.timestamp() > epochMilli2) {
                                            logger.debug("Record timestamp {} exceeds endTs {} for partition {}, stopping processing for this partition in this poll.", new Object[]{Long.valueOf(consumerRecord.timestamp()), Long.valueOf(epochMilli2), Integer.valueOf(topicPartition4.partition())});
                                            break;
                                        }
                                        sb.append("key=").append(new String((byte[]) consumerRecord.key(), StandardCharsets.UTF_8)).append(" value=").append(new String((byte[]) consumerRecord.value(), StandardCharsets.UTF_8)).append("\n");
                                    }
                                }
                            }
                        }
                    }
                    if (kafkaConsumer2 != null) {
                        kafkaConsumer2.close();
                        logger.debug("Kafka consumer closed.");
                    }
                    String sb2 = sb.toString();
                    if (logger.isTraceEnabled()) {
                        logger.trace("Export result length = {}", Integer.valueOf(sb2.length()));
                    }
                    return NioUtils.toByteBuffer(sb2);
                } catch (Exception e) {
                    logger.error("Error during Kafka export processing", e);
                    ByteBuffer byteBuffer2 = NioUtils.toByteBuffer(getStatus(httpServerExchange, KAFKA_ACCESS_ERROR, new Object[]{"Kafka processing error: " + e.getMessage()}));
                    if (0 != 0) {
                        kafkaConsumer.close();
                        logger.debug("Kafka consumer closed.");
                    }
                    return byteBuffer2;
                }
            } catch (Throwable th) {
                if (0 != 0) {
                    kafkaConsumer.close();
                    logger.debug("Kafka consumer closed.");
                }
                throw th;
            }
        } catch (Exception e2) {
            logger.error("Invalid timestamp format. startTs={}, endTs={}", new Object[]{str2, str3, e2});
            return NioUtils.toByteBuffer(getStatus(httpServerExchange, INVALID_VARIABLE_FORMAT, new Object[]{"startTs or endTs", str2 + " or " + str3}));
        }
    }
}
