package org.phoebus.applications.alarm.talk;

import com.fasterxml.jackson.databind.JsonNode;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Level;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.phoebus.applications.alarm.AlarmSystem;
import org.phoebus.applications.alarm.client.KafkaHelper;
import org.phoebus.applications.alarm.model.SeverityLevel;
import org.phoebus.applications.alarm.model.json.JsonModelReader;
import org.phoebus.applications.alarm.model.json.JsonTags;
import org.springframework.util.backoff.ExponentialBackOff;

/* loaded from: input_file:BOOT-INF/lib/app-alarm-model-4.6.8.jar:org/phoebus/applications/alarm/talk/TalkClient.class */
public class TalkClient {
    private final CopyOnWriteArrayList<TalkClientListener> listeners = new CopyOnWriteArrayList<>();
    private final AtomicBoolean running = new AtomicBoolean(true);
    private final Consumer<String, String> consumer;
    private final Thread thread;

    public TalkClient(String str, String str2) {
        Objects.requireNonNull(str);
        Objects.requireNonNull(str2);
        this.consumer = KafkaHelper.connectConsumer(str, List.of(str2 + "Talk"), Collections.emptyList());
        this.thread = new Thread(this::run, "TalkClient");
        this.thread.setDaemon(true);
    }

    public void addListener(TalkClientListener talkClientListener) {
        this.listeners.add(talkClientListener);
    }

    public void removeListener(TalkClientListener talkClientListener) {
        if (!this.listeners.remove(talkClientListener)) {
            throw new IllegalStateException("Unknown listener");
        }
    }

    public void start() {
        this.thread.start();
    }

    public boolean isRunning() {
        return this.thread.isAlive();
    }

    private void run() {
        while (this.running.get()) {
            try {
                checkUpdates();
            } catch (Throwable th) {
                if (this.running.get()) {
                    AlarmSystem.logger.log(Level.SEVERE, "Alarm client model error", th);
                }
                return;
            } finally {
                this.consumer.close();
            }
        }
    }

    private void checkUpdates() {
        Iterator<ConsumerRecord<String, String>> it = this.consumer.poll(100L).iterator();
        while (it.hasNext()) {
            ConsumerRecord<String, String> next = it.next();
            String key = next.key();
            try {
                JsonNode jsonNode = (JsonNode) JsonModelReader.parseJsonText(next.value());
                String textValue = jsonNode.get(JsonTags.SEVERITY).textValue();
                boolean asBoolean = jsonNode.get(JsonTags.STANDOUT).asBoolean();
                String textValue2 = jsonNode.get(JsonTags.TALK).textValue();
                try {
                    Iterator<TalkClientListener> it2 = this.listeners.iterator();
                    while (it2.hasNext()) {
                        it2.next().messageReceived(SeverityLevel.valueOf(textValue), asBoolean, textValue2);
                    }
                } catch (Exception e) {
                    AlarmSystem.logger.log(Level.WARNING, "Talk error for " + textValue + ", " + textValue2, (Throwable) e);
                }
            } catch (Exception e2) {
                AlarmSystem.logger.log(Level.WARNING, "Parsing of talk message for " + key + " failed.", (Throwable) e2);
            }
        }
    }

    public void shutdown() {
        this.running.set(false);
        this.consumer.wakeup();
        try {
            this.thread.join(ExponentialBackOff.DEFAULT_INITIAL_INTERVAL);
        } catch (InterruptedException e) {
            AlarmSystem.logger.log(Level.WARNING, "Talk client thread doesn't shut down", (Throwable) e);
        }
        AlarmSystem.logger.info(this.thread.getName() + " shut down");
    }
}
