package io.kmachine.rest.server;

import com.fasterxml.jackson.databind.JsonNode;
import io.kcache.CacheUpdateHandler;
import io.kcache.KafkaCache;
import io.kcache.KafkaCacheConfig;
import io.kcache.utils.InMemoryCache;
import io.kmachine.KMachine;
import io.kmachine.model.StateMachine;
import io.kmachine.utils.ClientUtils;
import io.kmachine.utils.JsonSerde;
import java.net.InetAddress;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.UnknownHostException;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import javax.annotation.PostConstruct;
import javax.enterprise.context.ApplicationScoped;
import javax.inject.Inject;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsBuilder;
import org.eclipse.microprofile.config.inject.ConfigProperty;

@ApplicationScoped
/* loaded from: input_file:io/kmachine/rest/server/KMachineManager.class */
public class KMachineManager {

    @ConfigProperty(name = "kafka.bootstrap.servers")
    String bootstrapServers;

    @ConfigProperty(name = "quarkus.http.port")
    int port;

    @ConfigProperty(name = "quarkus.http.ssl-port")
    int sslPort;

    @ConfigProperty(name = "quarkus.http.insecure-requests")
    String insecureRequests;

    @Inject
    KMachineConfig config;
    private KafkaCacheConfig cacheConfig;
    private KafkaCache<String, JsonNode> cache;
    private final Map<String, KMachine> machines = new ConcurrentHashMap();

    /* loaded from: input_file:io/kmachine/rest/server/KMachineManager$KMachineUpdateHandler.class */
    class KMachineUpdateHandler implements CacheUpdateHandler<String, JsonNode> {
        KMachineUpdateHandler() {
        }

        public void handleUpdate(String str, JsonNode jsonNode, JsonNode jsonNode2, TopicPartition topicPartition, long j, long j2) {
            if (jsonNode == null) {
                KMachine remove = KMachineManager.this.machines.remove(str);
                if (remove != null) {
                    remove.close();
                    return;
                }
                return;
            }
            StateMachine fromJsonNode = StateMachine.fromJsonNode(jsonNode);
            String name = fromJsonNode.getName();
            KMachine kMachine = new KMachine(name, KMachineManager.this.bootstrapServers(), fromJsonNode);
            KMachineManager.this.machines.put(name, kMachine);
            Properties streamsConfig = ClientUtils.streamsConfig(name, "client-" + name, KMachineManager.this.bootstrapServers(), JsonSerde.class, JsonSerde.class);
            streamsConfig.put("application.server", KMachineManager.this.applicationServer());
            kMachine.configure(new StreamsBuilder(), streamsConfig);
        }
    }

    @PostConstruct
    public void init() {
        Map map = (Map) this.config.kafkaCacheConfig().entrySet().stream().collect(Collectors.toMap(entry -> {
            return "kafkacache." + ((String) entry.getKey());
        }, (v0) -> {
            return v0.getValue();
        }));
        map.put("kafkacache.bootstrap.servers", this.bootstrapServers);
        map.put("kafkacache.topic", "_kmachines");
        map.put("kafkacache.group.id", "kmachine-1");
        map.put("kafkacache.client.id", "kmachine-1" + "-" + "_kmachines");
        this.cacheConfig = new KafkaCacheConfig(map);
        this.cache = new KafkaCache<>(this.cacheConfig, Serdes.String(), new JsonSerde(), new KMachineUpdateHandler(), new InMemoryCache());
        this.cache.init();
    }

    public URI uri() {
        try {
            return new URI(("enabled".equalsIgnoreCase(this.insecureRequests) ? "http" : "https") + "://" + host() + ":" + port());
        } catch (URISyntaxException e) {
            throw new RuntimeException(e);
        }
    }

    public String applicationServer() {
        return host() + ":" + port();
    }

    public String host() {
        return this.config.hostName().orElse(getDefaultHost());
    }

    public int port() {
        return "enabled".equalsIgnoreCase(this.insecureRequests) ? this.port : this.sslPort;
    }

    private static String getDefaultHost() {
        try {
            return InetAddress.getLocalHost().getCanonicalHostName();
        } catch (UnknownHostException e) {
            throw new ConfigException("Unknown local hostname", e);
        }
    }

    public String bootstrapServers() {
        return this.bootstrapServers;
    }

    public KafkaCacheConfig cacheConfig() {
        return this.cacheConfig;
    }

    public void sync() {
        this.cache.sync();
    }

    public void create(String str, StateMachine stateMachine) {
        if (this.cache.containsKey(str)) {
            throw new IllegalArgumentException("KMachine already exists with id: " + str);
        }
        this.cache.put(str, stateMachine.toJsonNode());
    }

    public Set<String> list() {
        return this.cache.keySet();
    }

    public KMachine get(String str) {
        return this.machines.get(str);
    }

    public void remove(String str) {
        this.cache.remove(str);
    }
}
