package ai.superstream.core;

import ai.superstream.model.MetadataMessage;
import ai.superstream.shaded.com.fasterxml.jackson.databind.ObjectMapper;
import ai.superstream.util.SuperstreamLogger;
import java.io.IOException;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
import java.util.UUID;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;

/* loaded from: input_file:ai/superstream/core/MetadataConsumer.class */
public class MetadataConsumer {
    private static final String METADATA_TOPIC = "superstream.metadata_v1";
    private static final SuperstreamLogger logger = SuperstreamLogger.getLogger(MetadataConsumer.class);
    private static final ObjectMapper objectMapper = new ObjectMapper();

    public MetadataMessage getMetadataMessage(String str, Properties properties) {
        Properties properties2 = new Properties();
        copyAuthenticationProperties(properties, properties2);
        properties2.put("bootstrap.servers", str);
        properties2.put("group.id", "superstream-metadata-consumer-" + UUID.randomUUID());
        properties2.put("key.deserializer", StringDeserializer.class.getName());
        properties2.put("value.deserializer", StringDeserializer.class.getName());
        properties2.put("auto.offset.reset", "earliest");
        properties2.put("client.id", "superstreamlib-metadata-consumer");
        try {
            KafkaConsumer kafkaConsumer = new KafkaConsumer(properties2);
            try {
                if (!kafkaConsumer.listTopics().keySet().contains(METADATA_TOPIC)) {
                    logger.warn("The {} topic does not exist on the Kafka cluster at {}", METADATA_TOPIC, str);
                    kafkaConsumer.close();
                    return null;
                }
                TopicPartition topicPartition = new TopicPartition(METADATA_TOPIC, 0);
                kafkaConsumer.assign(Collections.singletonList(topicPartition));
                kafkaConsumer.seekToEnd(Collections.singletonList(topicPartition));
                long position = kafkaConsumer.position(topicPartition);
                if (position == 0) {
                    logger.warn("The {} topic is empty", METADATA_TOPIC);
                    kafkaConsumer.close();
                    return null;
                }
                kafkaConsumer.seek(topicPartition, position - 1);
                ConsumerRecords poll = kafkaConsumer.poll(Duration.ofSeconds(5L));
                if (poll.isEmpty()) {
                    logger.warn("Failed to retrieve a message from the {} topic", METADATA_TOPIC);
                    kafkaConsumer.close();
                    return null;
                }
                MetadataMessage metadataMessage = (MetadataMessage) objectMapper.readValue((String) ((ConsumerRecord) poll.iterator().next()).value(), MetadataMessage.class);
                kafkaConsumer.close();
                return metadataMessage;
            } catch (Throwable th) {
                try {
                    kafkaConsumer.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
                throw th;
            }
        } catch (IOException e) {
            logger.error("Failed to parse the metadata message", e);
            return null;
        } catch (Exception e2) {
            logger.error("Failed to retrieve the metadata message", e2);
            return null;
        }
    }

    private void copyAuthenticationProperties(Properties properties, Properties properties2) {
        for (String str : new String[]{"security.protocol", "ssl.truststore.location", "ssl.truststore.password", "ssl.keystore.location", "ssl.keystore.password", "ssl.key.password", "ssl.endpoint.identification.algorithm", "ssl.truststore.type", "ssl.keystore.type", "ssl.secure.random.implementation", "ssl.enabled.protocols", "ssl.cipher.suites", "sasl.mechanism", "sasl.jaas.config", "sasl.client.callback.handler.class", "sasl.login.callback.handler.class", "sasl.login.class", "sasl.kerberos.service.name", "sasl.kerberos.kinit.cmd", "sasl.kerberos.ticket.renew.window.factor", "sasl.kerberos.ticket.renew.jitter", "sasl.kerberos.min.time.before.relogin", "sasl.login.refresh.window.factor", "sasl.login.refresh.window.jitter", "sasl.login.refresh.min.period.seconds", "sasl.login.refresh.buffer.seconds", "request.timeout.ms", "retry.backoff.ms", "connections.max.idle.ms", "reconnect.backoff.ms", "reconnect.backoff.max.ms"}) {
            if (properties.containsKey(str)) {
                properties2.put(str, properties.get(str));
            }
        }
    }
}
