package org.phoebus.applications.alarm.client;

import java.io.FileInputStream;
import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.Properties;
import java.util.UUID;
import java.util.logging.Level;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.phoebus.applications.alarm.AlarmSystem;

/* loaded from: input_file:BOOT-INF/lib/app-alarm-model-4.7.1.jar:org/phoebus/applications/alarm/client/KafkaHelper.class */
public class KafkaHelper {
    public static Consumer<String, String> connectConsumer(String str, List<String> list, final List<String> list2, String str2) {
        Properties loadPropsFromFile = loadPropsFromFile(str2);
        loadPropsFromFile.put("bootstrap.servers", str);
        if (!loadPropsFromFile.containsKey(ConsumerConfig.GROUP_ID_CONFIG)) {
            loadPropsFromFile.put(ConsumerConfig.GROUP_ID_CONFIG, "Alarm-" + UUID.randomUUID());
        }
        AlarmSystem.logger.fine(loadPropsFromFile.getProperty(ConsumerConfig.GROUP_ID_CONFIG) + " subscribes to " + str + " for " + list);
        StringDeserializer stringDeserializer = new StringDeserializer();
        final KafkaConsumer kafkaConsumer = new KafkaConsumer(loadPropsFromFile, (Deserializer) stringDeserializer, (Deserializer) stringDeserializer);
        kafkaConsumer.subscribe(list, new ConsumerRebalanceListener() { // from class: org.phoebus.applications.alarm.client.KafkaHelper.1
            @Override // org.apache.kafka.clients.consumer.ConsumerRebalanceListener
            public void onPartitionsAssigned(Collection<TopicPartition> collection) {
                for (TopicPartition topicPartition : collection) {
                    if (list2.contains(topicPartition.topic())) {
                        kafkaConsumer.seekToBeginning(List.of(topicPartition));
                        AlarmSystem.logger.info("Reading from start of " + topicPartition.topic());
                    } else {
                        AlarmSystem.logger.info("Reading updates for " + topicPartition.topic());
                    }
                }
            }

            @Override // org.apache.kafka.clients.consumer.ConsumerRebalanceListener
            public void onPartitionsRevoked(Collection<TopicPartition> collection) {
            }
        });
        return kafkaConsumer;
    }

    public static Producer<String, String> connectProducer(String str, String str2) {
        Properties loadPropsFromFile = loadPropsFromFile(str2);
        loadPropsFromFile.put("bootstrap.servers", str);
        loadPropsFromFile.put(ProducerConfig.LINGER_MS_CONFIG, 20);
        StringSerializer stringSerializer = new StringSerializer();
        return new KafkaProducer(loadPropsFromFile, (Serializer) stringSerializer, (Serializer) stringSerializer);
    }

    public static KafkaStreams aggregateTopics(String str, List<String> list, String str2, String str3) {
        Properties loadPropsFromFile = loadPropsFromFile(str3);
        loadPropsFromFile.put(StreamsConfig.APPLICATION_ID_CONFIG, "Stream-To-Long-Term");
        loadPropsFromFile.put("bootstrap.servers", str);
        loadPropsFromFile.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        loadPropsFromFile.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        streamsBuilder.stream(list).mapValues(str4 -> {
            return str4;
        }).to(str2);
        return new KafkaStreams(streamsBuilder.build(), loadPropsFromFile);
    }

    public static Properties loadPropsFromFile(String str) {
        AlarmSystem.logger.fine("loading file from path: " + str);
        Properties properties = new Properties();
        if (str != null && !str.isBlank()) {
            try {
                FileInputStream fileInputStream = new FileInputStream(str);
                try {
                    properties.load(fileInputStream);
                    fileInputStream.close();
                } finally {
                }
            } catch (IOException e) {
                AlarmSystem.logger.log(Level.SEVERE, "failed to load kafka properties", (Throwable) e);
            }
        }
        return properties;
    }
}
