package org.apache.kafka.trogdor.common;

import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.concurrent.Callable;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.kafka.common.utils.Scheduler;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.trogdor.agent.Agent;
import org.apache.kafka.trogdor.agent.AgentClient;
import org.apache.kafka.trogdor.agent.AgentRestResource;
import org.apache.kafka.trogdor.basic.BasicNode;
import org.apache.kafka.trogdor.basic.BasicPlatform;
import org.apache.kafka.trogdor.basic.BasicTopology;
import org.apache.kafka.trogdor.coordinator.Coordinator;
import org.apache.kafka.trogdor.coordinator.CoordinatorClient;
import org.apache.kafka.trogdor.coordinator.CoordinatorRestResource;
import org.apache.kafka.trogdor.rest.JsonRestServer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/kafka/trogdor/common/MiniTrogdorCluster.class */
public class MiniTrogdorCluster implements AutoCloseable {
    private static final Logger log = LoggerFactory.getLogger(MiniTrogdorCluster.class);
    private final TreeMap<String, Agent> agents;
    private final Coordinator coordinator;

    /* loaded from: input_file:org/apache/kafka/trogdor/common/MiniTrogdorCluster$Builder.class */
    public static class Builder {
        private final TreeSet<String> agentNames = new TreeSet<>();
        private String coordinatorName = null;
        private Scheduler scheduler = Scheduler.SYSTEM;
        private BasicPlatform.CommandRunner commandRunner = new BasicPlatform.ShellCommandRunner();

        /* JADX INFO: Access modifiers changed from: private */
        /* loaded from: input_file:org/apache/kafka/trogdor/common/MiniTrogdorCluster$Builder$NodeData.class */
        public static class NodeData {
            String hostname;
            AgentRestResource agentRestResource;
            JsonRestServer agentRestServer;
            int agentPort;
            JsonRestServer coordinatorRestServer;
            int coordinatorPort;
            CoordinatorRestResource coordinatorRestResource;
            Platform platform;
            Agent agent;
            Coordinator coordinator;
            BasicNode node;

            private NodeData() {
                this.agentRestResource = null;
                this.agentRestServer = null;
                this.agentPort = 0;
                this.coordinatorRestServer = null;
                this.coordinatorPort = 0;
                this.coordinatorRestResource = null;
                this.platform = null;
                this.agent = null;
                this.coordinator = null;
                this.node = null;
            }
        }

        public Builder scheduler(Scheduler scheduler) {
            this.scheduler = scheduler;
            return this;
        }

        public Builder commandRunner(BasicPlatform.CommandRunner commandRunner) {
            this.commandRunner = commandRunner;
            return this;
        }

        public Builder addCoordinator(String str) {
            if (this.coordinatorName != null) {
                throw new RuntimeException("At most one coordinator is allowed.");
            }
            this.coordinatorName = str;
            return this;
        }

        public Builder addAgent(String str) {
            if (this.agentNames.contains(str)) {
                throw new RuntimeException("There is already an agent on node " + str);
            }
            this.agentNames.add(str);
            return this;
        }

        private NodeData getOrCreate(String str, TreeMap<String, NodeData> treeMap) {
            NodeData nodeData = treeMap.get(str);
            if (nodeData != null) {
                return nodeData;
            }
            NodeData nodeData2 = new NodeData();
            nodeData2.hostname = "127.0.0.1";
            treeMap.put(str, nodeData2);
            return nodeData2;
        }

        public MiniTrogdorCluster build() throws Exception {
            MiniTrogdorCluster.log.info("Creating MiniTrogdorCluster with agents: {} and coordinator: {}", Utils.join(this.agentNames, ", "), this.coordinatorName);
            TreeMap<String, NodeData> treeMap = new TreeMap<>();
            Iterator<String> it = this.agentNames.iterator();
            while (it.hasNext()) {
                NodeData orCreate = getOrCreate(it.next(), treeMap);
                orCreate.agentRestResource = new AgentRestResource();
                orCreate.agentRestServer = new JsonRestServer(0);
                orCreate.agentRestServer.start(new Object[]{orCreate.agentRestResource});
                orCreate.agentPort = orCreate.agentRestServer.port();
            }
            if (this.coordinatorName != null) {
                NodeData orCreate2 = getOrCreate(this.coordinatorName, treeMap);
                orCreate2.coordinatorRestResource = new CoordinatorRestResource();
                orCreate2.coordinatorRestServer = new JsonRestServer(0);
                orCreate2.coordinatorRestServer.start(new Object[]{orCreate2.coordinatorRestResource});
                orCreate2.coordinatorPort = orCreate2.coordinatorRestServer.port();
            }
            for (Map.Entry<String, NodeData> entry : treeMap.entrySet()) {
                NodeData value = entry.getValue();
                HashMap hashMap = new HashMap();
                if (value.agentPort != 0) {
                    hashMap.put("trogdor.agent.port", Integer.toString(value.agentPort));
                }
                if (value.coordinatorPort != 0) {
                    hashMap.put("trogdor.coordinator.port", Integer.toString(value.coordinatorPort));
                }
                value.node = new BasicNode(entry.getKey(), value.hostname, hashMap, Collections.emptySet());
            }
            TreeMap treeMap2 = new TreeMap();
            for (Map.Entry<String, NodeData> entry2 : treeMap.entrySet()) {
                treeMap2.put(entry2.getKey(), entry2.getValue().node);
            }
            final BasicTopology basicTopology = new BasicTopology(treeMap2);
            ScheduledExecutorService newScheduledThreadPool = Executors.newScheduledThreadPool(1, ThreadUtils.createThreadFactory("MiniTrogdorClusterStartupThread%d", false));
            final AtomicReference atomicReference = new AtomicReference(null);
            for (final Map.Entry<String, NodeData> entry3 : treeMap.entrySet()) {
                newScheduledThreadPool.submit(new Callable<Void>() { // from class: org.apache.kafka.trogdor.common.MiniTrogdorCluster.Builder.1
                    /* JADX WARN: Can't rename method to resolve collision */
                    @Override // java.util.concurrent.Callable
                    public Void call() throws Exception {
                        String str = (String) entry3.getKey();
                        try {
                            NodeData nodeData = (NodeData) entry3.getValue();
                            nodeData.platform = new BasicPlatform(str, basicTopology, Builder.this.scheduler, Builder.this.commandRunner);
                            if (nodeData.agentRestResource != null) {
                                nodeData.agent = new Agent(nodeData.platform, Builder.this.scheduler, nodeData.agentRestServer, nodeData.agentRestResource);
                            }
                            if (nodeData.coordinatorRestResource != null) {
                                nodeData.coordinator = new Coordinator(nodeData.platform, Builder.this.scheduler, nodeData.coordinatorRestServer, nodeData.coordinatorRestResource, 0L);
                            }
                            return null;
                        } catch (Exception e) {
                            MiniTrogdorCluster.log.error("Unable to initialize {}", str, e);
                            atomicReference.compareAndSet(null, e);
                            return null;
                        }
                    }
                });
            }
            newScheduledThreadPool.shutdown();
            newScheduledThreadPool.awaitTermination(1L, TimeUnit.DAYS);
            Exception exc = (Exception) atomicReference.get();
            if (exc != null) {
                throw exc;
            }
            TreeMap treeMap3 = new TreeMap();
            Coordinator coordinator = null;
            for (Map.Entry<String, NodeData> entry4 : treeMap.entrySet()) {
                NodeData value2 = entry4.getValue();
                if (value2.agent != null) {
                    treeMap3.put(entry4.getKey(), value2.agent);
                }
                if (value2.coordinator != null) {
                    coordinator = value2.coordinator;
                }
            }
            return new MiniTrogdorCluster(treeMap3, coordinator);
        }
    }

    private MiniTrogdorCluster(TreeMap<String, Agent> treeMap, Coordinator coordinator) {
        this.agents = treeMap;
        this.coordinator = coordinator;
    }

    public TreeMap<String, Agent> agents() {
        return this.agents;
    }

    public Coordinator coordinator() {
        return this.coordinator;
    }

    public CoordinatorClient coordinatorClient() {
        if (this.coordinator == null) {
            throw new RuntimeException("No coordinator configured.");
        }
        return new CoordinatorClient.Builder().maxTries(10).target("localhost", this.coordinator.port()).build();
    }

    public AgentClient agentClient(String str) {
        Agent agent = this.agents.get(str);
        if (agent == null) {
            throw new RuntimeException("No agent configured on node " + str);
        }
        return new AgentClient.Builder().maxTries(10).target("localhost", agent.port()).build();
    }

    @Override // java.lang.AutoCloseable
    public void close() throws Exception {
        log.info("Closing MiniTrogdorCluster.");
        if (this.coordinator != null) {
            this.coordinator.beginShutdown(false);
        }
        Iterator<Agent> it = this.agents.values().iterator();
        while (it.hasNext()) {
            it.next().beginShutdown();
        }
        Iterator<Agent> it2 = this.agents.values().iterator();
        while (it2.hasNext()) {
            it2.next().waitForShutdown();
        }
        if (this.coordinator != null) {
            this.coordinator.waitForShutdown();
        }
    }
}
