package org.phoebus.alarm.logging;

import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.io.OutputStream;
import java.net.URISyntaxException;
import java.nio.file.FileVisitOption;
import java.nio.file.Files;
import java.nio.file.OpenOption;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.time.Duration;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.logging.Level;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.eclipse.jgit.api.Git;
import org.eclipse.jgit.api.PushCommand;
import org.eclipse.jgit.api.RemoteRemoveCommand;
import org.eclipse.jgit.api.errors.GitAPIException;
import org.eclipse.jgit.lib.RepositoryCache;
import org.eclipse.jgit.transport.AmazonS3;
import org.eclipse.jgit.transport.URIish;
import org.eclipse.jgit.transport.UsernamePasswordCredentialsProvider;
import org.eclipse.jgit.util.FS;
import org.phoebus.applications.alarm.AlarmSystem;
import org.phoebus.applications.alarm.client.AlarmClient;
import org.phoebus.applications.alarm.model.xml.XmlModelWriter;

/* loaded from: input_file:BOOT-INF/classes/org/phoebus/alarm/logging/AlarmConfigLogger.class */
public class AlarmConfigLogger implements Runnable {
    private final String topic;
    private final String remoteLocation;
    private final File root;
    private final AlarmClient model;
    private static final String REMOTE_NAME = "remote";
    KafkaStreams streams = null;
    ObjectMapper objectMapper = new ObjectMapper();
    private final String group_id = "Alarm-" + UUID.randomUUID();
    private Properties props = PropertiesHelper.getProperties();

    /* loaded from: input_file:BOOT-INF/classes/org/phoebus/alarm/logging/AlarmConfigLogger$ProcessAlarmConfigMessage.class */
    private class ProcessAlarmConfigMessage implements Processor<String, String> {
        private ProcessAlarmConfigMessage() {
        }

        @Override // org.apache.kafka.streams.processor.Processor
        public void init(ProcessorContext processorContext) {
        }

        @Override // org.apache.kafka.streams.processor.Processor
        public synchronized void process(String str, String str2) {
            AlarmConfigLogger.this.processAlarmConfigMessages(str, str2, true);
        }

        @Override // org.apache.kafka.streams.processor.Processor
        public void close() {
        }
    }

    public AlarmConfigLogger(String str, String str2, String str3) {
        this.topic = str;
        this.remoteLocation = str3;
        this.props.put(StreamsConfig.APPLICATION_ID_CONFIG, "AlarmConfigLogger-streams-" + this.topic);
        if (!this.props.containsKey("bootstrap.servers")) {
            this.props.put("bootstrap.servers", "localhost:9092");
        }
        this.props.put(ConsumerConfig.GROUP_ID_CONFIG, this.group_id);
        this.props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        this.root = new File(str2, this.topic);
        this.root.mkdirs();
        this.model = new AlarmClient(this.props.getProperty("bootstrap.servers"), this.topic);
        this.model.start();
        initialize();
    }

    private void initialize() {
        if (!this.root.isDirectory()) {
            this.root.mkdirs();
        }
        if (!RepositoryCache.FileKey.isGitRepository(this.root, FS.detect())) {
            try {
                Git call = Git.init().setDirectory(this.root).setBare(false).call();
                try {
                    AlarmSystem.logger.log(Level.INFO, "Created repository: " + call.getRepository().getDirectory());
                    if (call != null) {
                        call.close();
                    }
                } finally {
                }
            } catch (IllegalStateException | GitAPIException e) {
                AlarmSystem.logger.log(Level.WARNING, "Failed to initiate the git repo", e);
            }
        }
        if (this.remoteLocation != null && !this.remoteLocation.isEmpty()) {
            try {
                Git open = Git.open(this.root, FS.detect());
                URIish uRIish = new URIish(this.remoteLocation);
                RemoteRemoveCommand remoteRemove = open.remoteRemove();
                remoteRemove.setName("remote");
                remoteRemove.call();
                open.remoteAdd().setName("remote").setUri(uRIish).call();
            } catch (IOException | URISyntaxException | GitAPIException e2) {
                AlarmSystem.logger.log(Level.WARNING, "Failed to properly configure remote", e2);
            }
        }
        writeAlarmModel();
        try {
            KafkaConsumer kafkaConsumer = new KafkaConsumer(this.props, (Deserializer) Serdes.String().deserializer(), (Deserializer) Serdes.String().deserializer());
            try {
                kafkaConsumer.subscribe(List.of(this.topic), new ConsumerRebalanceListener() { // from class: org.phoebus.alarm.logging.AlarmConfigLogger.1
                    @Override // org.apache.kafka.clients.consumer.ConsumerRebalanceListener
                    public void onPartitionsAssigned(Collection<TopicPartition> collection) {
                    }

                    @Override // org.apache.kafka.clients.consumer.ConsumerRebalanceListener
                    public void onPartitionsRevoked(Collection<TopicPartition> collection) {
                    }
                });
                syncAlarmConfigRepository(kafkaConsumer.poll(Duration.ofSeconds(10L)));
                kafkaConsumer.close();
            } finally {
            }
        } catch (Exception e3) {
            AlarmSystem.logger.log(Level.WARNING, "Failed to create the alarm model", (Throwable) e3);
        }
        try {
            Git open2 = Git.open(this.root);
            try {
                open2.add().addFilepattern(".").call();
                open2.commit().setAll(true).setMessage("Dump of the alarm configuration of the server").call();
                if (open2 != null) {
                    open2.close();
                }
            } finally {
            }
        } catch (IOException | GitAPIException e4) {
            AlarmSystem.logger.log(Level.WARNING, "Failed to commit the dump of the alarm config", e4);
        }
    }

    @Override // java.lang.Runnable
    public void run() {
        try {
            StreamsBuilder streamsBuilder = new StreamsBuilder();
            streamsBuilder.stream(this.topic, Consumed.with(Serdes.String(), Serdes.String())).process(new ProcessorSupplier<String, String>() { // from class: org.phoebus.alarm.logging.AlarmConfigLogger.2
                @Override // org.apache.kafka.streams.processor.ProcessorSupplier
                public Processor<String, String> get() {
                    return new ProcessAlarmConfigMessage();
                }
            }, new String[0]);
            Topology build = streamsBuilder.build();
            AlarmSystem.logger.config(build.describe().toString());
            this.streams = new KafkaStreams(build, this.props);
        } catch (Exception e) {
            AlarmSystem.logger.log(Level.WARNING, "Failed to commit the alarm config message", (Throwable) e);
        }
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        Runtime.getRuntime().addShutdownHook(new Thread("streams-" + this.topic + "-alarm-shutdown-hook") { // from class: org.phoebus.alarm.logging.AlarmConfigLogger.3
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                AlarmConfigLogger.this.streams.close();
                countDownLatch.countDown();
            }
        });
        try {
            this.streams.start();
            countDownLatch.await();
        } catch (Throwable th) {
            System.exit(1);
        }
        System.exit(0);
    }

    private synchronized void processAlarmConfigMessages(String str, String str2, boolean z) {
        String replaceAll;
        try {
            if (str.contains("config:/")) {
                String str3 = str.split("config:/")[1];
                AlarmSystem.logger.log(Level.INFO, "processing message:" + str3 + ":" + str2);
                if (str2 != null) {
                    replaceAll = str3.replaceAll("[:|?*]", "_");
                    File file = Paths.get(this.root.getParent(), replaceAll).toFile();
                    file.mkdirs();
                    try {
                        FileWriter fileWriter = new FileWriter(new File(file, "alarm_config.json"));
                        try {
                            fileWriter.write(this.objectMapper.writerWithDefaultPrettyPrinter().writeValueAsString(this.objectMapper.readValue(str2, Object.class)));
                            fileWriter.close();
                        } catch (Throwable th) {
                            try {
                                fileWriter.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                            throw th;
                        }
                    } catch (IOException e) {
                        AlarmSystem.logger.log(Level.WARNING, "Alarm config logging failed for path " + replaceAll + ", config " + str2, (Throwable) e);
                    }
                } else {
                    replaceAll = str3.replaceAll("[:|?*]", "_");
                    Path path = Paths.get(this.root.getParent(), replaceAll);
                    if (path.toFile().exists()) {
                        Files.walk(path, new FileVisitOption[0]).map((v0) -> {
                            return v0.toFile();
                        }).forEach((v0) -> {
                            v0.delete();
                        });
                        path.toFile().delete();
                    }
                }
                writeAlarmModel();
                if (z) {
                    try {
                        Git open = Git.open(this.root);
                        try {
                            open.add().addFilepattern(".").call();
                            open.commit().setAll(true).setMessage("Alarm config update " + replaceAll).call();
                            if (this.remoteLocation != null && !this.remoteLocation.isEmpty()) {
                                PushCommand push = open.push();
                                push.setRemote("remote");
                                push.setForce(true);
                                push.setCredentialsProvider(new UsernamePasswordCredentialsProvider(this.props.getProperty("username"), this.props.getProperty(AmazonS3.Keys.PASSWORD)));
                                push.call();
                            }
                            if (open != null) {
                                open.close();
                            }
                        } catch (Throwable th3) {
                            if (open != null) {
                                try {
                                    open.close();
                                } catch (Throwable th4) {
                                    th3.addSuppressed(th4);
                                }
                            }
                            throw th3;
                        }
                    } catch (IOException | GitAPIException e2) {
                        AlarmSystem.logger.log(Level.WARNING, "Failed to commit the configuration changes", e2);
                    }
                }
            }
        } catch (Exception e3) {
            AlarmSystem.logger.log(Level.WARNING, "Alarm state check error for path " + str + ", config " + str2, (Throwable) e3);
        }
    }

    private synchronized void syncAlarmConfigRepository(ConsumerRecords<String, String> consumerRecords) {
        Iterator<ConsumerRecord<String, String>> it = consumerRecords.iterator();
        while (it.hasNext()) {
            ConsumerRecord<String, String> next = it.next();
            processAlarmConfigMessages(next.key(), next.value(), false);
        }
    }

    private synchronized void writeAlarmModel() {
        File file = Paths.get(this.root.getPath(), ".restore-script").toFile();
        if (!file.mkdirs() && !file.exists()) {
            AlarmSystem.logger.log(Level.WARNING, "Alarm config logging failed to create .restore-script folder");
        }
        try {
            OutputStream newOutputStream = Files.newOutputStream(new File(file, "config.xml").toPath(), new OpenOption[0]);
            try {
                XmlModelWriter xmlModelWriter = new XmlModelWriter(newOutputStream);
                try {
                    xmlModelWriter.write(this.model.getRoot());
                    xmlModelWriter.close();
                    if (newOutputStream != null) {
                        newOutputStream.close();
                    }
                } catch (Throwable th) {
                    try {
                        xmlModelWriter.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                    throw th;
                }
            } finally {
            }
        } catch (Exception e) {
            AlarmSystem.logger.log(Level.WARNING, "Alarm config logging failed to dump the alarm configuration to config.xml", (Throwable) e);
        }
    }
}
