package io.hyperfoil.clustering;

import com.fasterxml.jackson.core.JsonEncoding;
import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.hyperfoil.api.BenchmarkExecutionException;
import io.hyperfoil.api.config.Agent;
import io.hyperfoil.api.config.Benchmark;
import io.hyperfoil.api.config.Phase;
import io.hyperfoil.api.config.RunHook;
import io.hyperfoil.api.deployment.DeployedAgent;
import io.hyperfoil.api.deployment.Deployer;
import io.hyperfoil.api.session.PhaseInstance;
import io.hyperfoil.clustering.AgentInfo;
import io.hyperfoil.clustering.ControllerPhase;
import io.hyperfoil.clustering.Run;
import io.hyperfoil.clustering.messages.AgentControlMessage;
import io.hyperfoil.clustering.messages.AgentHello;
import io.hyperfoil.clustering.messages.AgentReadyMessage;
import io.hyperfoil.clustering.messages.AgentStatusMessage;
import io.hyperfoil.clustering.messages.ErrorMessage;
import io.hyperfoil.clustering.messages.PhaseChangeMessage;
import io.hyperfoil.clustering.messages.PhaseControlMessage;
import io.hyperfoil.clustering.messages.RequestStatsMessage;
import io.hyperfoil.clustering.messages.SessionStatsMessage;
import io.hyperfoil.clustering.messages.StatsMessage;
import io.hyperfoil.clustering.util.PersistenceUtil;
import io.hyperfoil.controller.CsvWriter;
import io.hyperfoil.controller.JsonWriter;
import io.hyperfoil.controller.StatisticsStore;
import io.hyperfoil.core.hooks.ExecRunHook;
import io.hyperfoil.core.util.CountDown;
import io.hyperfoil.internal.Controller;
import io.vertx.core.AbstractVerticle;
import io.vertx.core.AsyncResult;
import io.vertx.core.DeploymentOptions;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.core.eventbus.EventBus;
import io.vertx.core.eventbus.Message;
import io.vertx.core.impl.VertxInternal;
import io.vertx.core.json.JsonArray;
import io.vertx.core.json.JsonObject;
import io.vertx.core.logging.Logger;
import io.vertx.core.logging.LoggerFactory;
import io.vertx.core.spi.cluster.NodeListener;
import io.vertx.ext.cluster.infinispan.InfinispanClusterManager;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.LinkOption;
import java.nio.file.OpenOption;
import java.nio.file.Path;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.ServiceLoader;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.infinispan.commons.api.BasicCacheContainer;

/* loaded from: input_file:io/hyperfoil/clustering/ControllerVerticle.class */
public class ControllerVerticle extends AbstractVerticle implements NodeListener {
    private static final Logger log;
    private EventBus eb;
    private ControllerServer server;
    private Deployer deployer;
    private AtomicInteger runIds = new AtomicInteger();
    private Map<String, Benchmark> benchmarks = new HashMap();
    private long timerId = -1;
    Map<String, Run> runs = new HashMap();
    static final /* synthetic */ boolean $assertionsDisabled;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: io.hyperfoil.clustering.ControllerVerticle$1, reason: invalid class name */
    /* loaded from: input_file:io/hyperfoil/clustering/ControllerVerticle$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$io$hyperfoil$api$session$PhaseInstance$Status = new int[PhaseInstance.Status.values().length];

        static {
            try {
                $SwitchMap$io$hyperfoil$api$session$PhaseInstance$Status[PhaseInstance.Status.RUNNING.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$io$hyperfoil$api$session$PhaseInstance$Status[PhaseInstance.Status.FINISHED.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$io$hyperfoil$api$session$PhaseInstance$Status[PhaseInstance.Status.TERMINATED.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
        }
    }

    public void start(Promise<Void> promise) {
        log.info("Starting in directory {}...", new Object[]{Controller.ROOT_DIR});
        CountDown countDown = new CountDown(promise, 2);
        this.server = new ControllerServer(this, countDown);
        this.vertx.exceptionHandler(th -> {
            log.error("Uncaught error: ", th);
        });
        if (Files.exists(Controller.RUN_DIR, new LinkOption[0])) {
            try {
                Files.list(Controller.RUN_DIR).forEach(this::updateRuns);
            } catch (IOException e) {
                log.error("Could not list run dir contents", e);
            } catch (Exception e2) {
                log.error("Cannot load previous runs from {}", e2, new Object[]{Controller.RUN_DIR});
            }
        }
        Controller.HOOKS_DIR.resolve("pre").toFile().mkdirs();
        Controller.HOOKS_DIR.resolve("post").toFile().mkdirs();
        this.eb = this.vertx.eventBus();
        this.eb.consumer(Feeds.DISCOVERY, message -> {
            AgentHello agentHello = (AgentHello) message.body();
            String runId = agentHello.runId();
            Run run = this.runs.get(runId);
            if (run == null) {
                log.error("Unknown run ID {}" + runId);
                message.fail(1, "Unknown run ID");
                return;
            }
            AgentInfo orElse = run.agents.stream().filter(agentInfo -> {
                return agentInfo.name.equals(agentHello.name());
            }).findAny().orElse(null);
            if (orElse == null) {
                log.error("Unknown agent {} ({}/{})", new Object[]{agentHello.name(), agentHello.nodeId(), agentHello.deploymentId()});
                message.fail(1, "Unknown agent");
                return;
            }
            if (orElse.status != AgentInfo.Status.STARTING) {
                log.info("Ignoring message, {} is not starting", new Object[]{orElse.name});
                message.reply("Ignoring");
                return;
            }
            log.debug("Registering agent {} ({}/{})", new Object[]{agentHello.name(), agentHello.nodeId(), agentHello.deploymentId()});
            orElse.nodeId = agentHello.nodeId();
            orElse.deploymentId = agentHello.deploymentId();
            orElse.status = AgentInfo.Status.REGISTERED;
            message.reply("Registered");
            if (run.agents.stream().allMatch(agentInfo2 -> {
                return agentInfo2.status != AgentInfo.Status.STARTING;
            })) {
                handleAgentsStarted(run);
            } else {
                log.debug("Waiting for registration from agents {}", new Object[]{run.agents.stream().filter(agentInfo3 -> {
                    return agentInfo3.status == AgentInfo.Status.STARTING;
                }).collect(Collectors.toList())});
            }
        });
        this.eb.consumer(Feeds.RESPONSE, message2 -> {
            AgentStatusMessage agentStatusMessage = (AgentStatusMessage) message2.body();
            Run run = this.runs.get(agentStatusMessage.runId());
            if (run == null) {
                log.error("No run {}", new Object[]{agentStatusMessage.runId()});
                return;
            }
            AgentInfo orElse = run.agents.stream().filter(agentInfo -> {
                return agentInfo.deploymentId.equals(agentStatusMessage.senderId());
            }).findAny().orElse(null);
            if (orElse == null) {
                log.error("No agent {} in run {}", new Object[]{agentStatusMessage.senderId(), run.id});
                return;
            }
            if (agentStatusMessage instanceof PhaseChangeMessage) {
                handlePhaseChange(run, orElse, (PhaseChangeMessage) agentStatusMessage);
                return;
            }
            if (agentStatusMessage instanceof ErrorMessage) {
                ErrorMessage errorMessage = (ErrorMessage) agentStatusMessage;
                run.errors.add(new Run.Error(orElse, errorMessage.error()));
                if (errorMessage.isFatal()) {
                    orElse.status = AgentInfo.Status.FAILED;
                    stopSimulation(run);
                    return;
                }
                return;
            }
            if (!(agentStatusMessage instanceof AgentReadyMessage)) {
                log.error("Unexpected type of message: {}", new Object[]{agentStatusMessage});
                return;
            }
            orElse.status = AgentInfo.Status.READY;
            if (run.agents.stream().allMatch(agentInfo2 -> {
                return agentInfo2.status == AgentInfo.Status.READY;
            })) {
                startSimulation(run);
            }
        });
        this.eb.consumer(Feeds.STATS, message3 -> {
            if (!(message3.body() instanceof StatsMessage)) {
                log.error("Unknown message type: " + message3.body());
                return;
            }
            StatsMessage statsMessage = (StatsMessage) message3.body();
            Run run = this.runs.get(statsMessage.runId);
            if (run == null) {
                log.error("Unknown run {}", new Object[]{statsMessage.runId});
            } else if (run.statisticsStore != null) {
                if (statsMessage instanceof RequestStatsMessage) {
                    RequestStatsMessage requestStatsMessage = (RequestStatsMessage) statsMessage;
                    String phase = run.phase(requestStatsMessage.phaseId);
                    if (requestStatsMessage.statistics != null) {
                        log.debug("Run {}: Received stats from {}: {}/{}/{}:{} ({} requests)", new Object[]{requestStatsMessage.runId, requestStatsMessage.address, phase, Integer.valueOf(requestStatsMessage.stepId), requestStatsMessage.metric, Integer.valueOf(requestStatsMessage.statistics.sequenceId), Integer.valueOf(requestStatsMessage.statistics.requestCount)});
                        run.statisticsStore.record(requestStatsMessage.address, requestStatsMessage.phaseId, requestStatsMessage.stepId, requestStatsMessage.metric, requestStatsMessage.statistics);
                    }
                    if (requestStatsMessage.isPhaseComplete) {
                        log.debug("Run {}: Received stats completion for phase {} from {}", new Object[]{run.id, phase, requestStatsMessage.address});
                        AgentInfo orElse = run.agents.stream().filter(agentInfo -> {
                            return agentInfo.deploymentId.equals(requestStatsMessage.address);
                        }).findFirst().orElse(null);
                        if (orElse == null) {
                            log.error("Run {}: Cannot find agent {}", new Object[]{run.id, requestStatsMessage.address});
                        } else if (orElse.phases.put(phase, PhaseInstance.Status.STATS_COMPLETE) == PhaseInstance.Status.STATS_COMPLETE) {
                            log.info("Run {}: stats for phase {} are already completed, ignoring.", new Object[]{run.id, phase});
                        } else if (run.agents.stream().map(agentInfo2 -> {
                            return agentInfo2.phases.get(phase);
                        }).allMatch(status -> {
                            return status == PhaseInstance.Status.STATS_COMPLETE;
                        })) {
                            log.info("Run {}: completed stats for phase {}", new Object[]{run.id, phase});
                            run.statisticsStore.completePhase(phase);
                            if (!run.statisticsStore.validateSlas()) {
                                log.info("SLA validation failed for {}", new Object[]{phase});
                                ControllerPhase controllerPhase = run.phases.get(phase);
                                controllerPhase.setFailed();
                                failNotStartedPhases(run, controllerPhase);
                            }
                        }
                    }
                } else if (statsMessage instanceof SessionStatsMessage) {
                    SessionStatsMessage sessionStatsMessage = (SessionStatsMessage) statsMessage;
                    log.trace("Run {}: Received session pool stats from {}", new Object[]{sessionStatsMessage.runId, sessionStatsMessage.address});
                    for (Map.Entry<String, SessionStatsMessage.MinMax> entry : sessionStatsMessage.sessionStats.entrySet()) {
                        run.statisticsStore.recordSessionStats(sessionStatsMessage.address, sessionStatsMessage.timestamp, entry.getKey(), entry.getValue().min, entry.getValue().max);
                    }
                }
            }
            message3.reply("OK");
        });
        if (this.vertx.isClustered()) {
            Iterator it = ServiceLoader.load(Deployer.Factory.class).iterator();
            while (true) {
                if (!it.hasNext()) {
                    break;
                }
                Deployer.Factory factory = (Deployer.Factory) it.next();
                log.debug("Found deployer {}", new Object[]{factory.name()});
                if (Controller.DEPLOYER.equals(factory.name())) {
                    this.deployer = factory.create();
                    break;
                }
            }
            if (this.deployer == null) {
                throw new IllegalStateException("Hyperfoil is running in clustered mode but it couldn't load deployer '" + Controller.DEPLOYER + "'");
            }
            if (this.vertx instanceof VertxInternal) {
                this.vertx.getClusterManager().nodeListener(this);
            }
        }
        if (!Controller.BENCHMARK_DIR.toFile().exists() && !Controller.BENCHMARK_DIR.toFile().mkdirs()) {
            log.error("Failed to create benchmark directory: {}", new Object[]{Controller.BENCHMARK_DIR});
        }
        countDown.increment();
        loadBenchmarks(countDown);
        countDown.countDown();
    }

    private void handlePhaseChange(Run run, AgentInfo agentInfo, PhaseChangeMessage phaseChangeMessage) {
        String phase = phaseChangeMessage.phase();
        log.debug("{} Received phase change from {}: {} is {} (session limit exceeded={}, errors={})", new Object[]{run.id, phaseChangeMessage.senderId(), phase, phaseChangeMessage.status(), Boolean.valueOf(phaseChangeMessage.sessionLimitExceeded()), phaseChangeMessage.getError()});
        agentInfo.phases.put(phase, phaseChangeMessage.status());
        ControllerPhase controllerPhase = run.phases.get(phase);
        if (phaseChangeMessage.sessionLimitExceeded()) {
            Phase.OpenModelPhase definition = controllerPhase.definition();
            run.statisticsStore.addFailure(((Phase) definition).name, null, controllerPhase.absoluteStartTime(), System.currentTimeMillis(), "Exceeded session limit");
            if ((definition instanceof Phase.OpenModelPhase) && definition.sessionLimitPolicy == Phase.SessionLimitPolicy.CONTINUE) {
                log.warn("{} Phase {} session limit exceeded, continuing due to policy {}", new Object[]{run.id, ((Phase) definition).name, definition.sessionLimitPolicy});
            } else {
                log.info("{} Failing phase due to exceeded session limit.", new Object[]{run.id});
                controllerPhase.setFailed();
            }
        }
        if (phaseChangeMessage.getError() != null) {
            log.error("{} Failing phase {} as agent {} reports error: {}", new Object[]{run.id, controllerPhase.definition().name, agentInfo.name, phaseChangeMessage.getError().getMessage()});
            controllerPhase.setFailed();
            run.errors.add(new Run.Error(agentInfo, phaseChangeMessage.getError()));
        }
        tryProgressStatus(run, phase);
        runSimulation(run);
    }

    public void nodeAdded(String str) {
    }

    public void nodeLeft(String str) {
        for (Run run : this.runs.values()) {
            if (!run.terminateTime.future().isComplete()) {
                Iterator<AgentInfo> it = run.agents.iterator();
                while (true) {
                    if (it.hasNext()) {
                        AgentInfo next = it.next();
                        if (Objects.equals(next.nodeId, str)) {
                            next.status = AgentInfo.Status.FAILED;
                            run.errors.add(new Run.Error(next, new BenchmarkExecutionException("Agent unexpectedly left the cluster.")));
                            kill(run, asyncResult -> {
                            });
                            stopSimulation(run);
                            break;
                        }
                    }
                }
            }
        }
    }

    private void updateRuns(Path path) {
        File file = path.toFile();
        if (file.getName().matches("[0-9A-F][0-9A-F][0-9A-F][0-9A-F]")) {
            String name = file.getName();
            int parseInt = Integer.parseInt(name, 16);
            if (parseInt >= this.runIds.get()) {
                this.runIds.set(parseInt + 1);
            }
            Path resolve = path.resolve("info.json");
            JsonObject jsonObject = new JsonObject();
            if (resolve.toFile().exists() && resolve.toFile().isFile()) {
                try {
                    jsonObject = new JsonObject(new String(Files.readAllBytes(resolve), StandardCharsets.UTF_8));
                } catch (Exception e) {
                    log.error("Cannot read info for run {}", new Object[]{name});
                    return;
                }
            }
            Run run = new Run(name, path, new Benchmark(jsonObject.getString("benchmark", "<unknown>"), (String) null, Collections.emptyMap(), new Agent[0], 0, Collections.emptyMap(), Collections.emptyList(), Collections.emptyMap(), 0L, (String) null, Collections.emptyList(), Collections.emptyList()));
            run.completed = true;
            run.startTime = jsonObject.getLong("startTime", 0L).longValue();
            run.terminateTime.complete(jsonObject.getLong("terminateTime", 0L));
            run.description = jsonObject.getString("description");
            JsonArray jsonArray = jsonObject.getJsonArray("errors");
            if (jsonArray != null) {
                List<Run.Error> list = run.errors;
                Stream stream = jsonArray.stream();
                Class<JsonObject> cls = JsonObject.class;
                Objects.requireNonNull(JsonObject.class);
                list.addAll((Collection) stream.map(cls::cast).map(jsonObject2 -> {
                    return new Run.Error(new AgentInfo(jsonObject2.getString("agent"), -1), new Throwable(jsonObject2.getString("msg")));
                }).collect(Collectors.toList()));
            }
            run.cancelled = jsonObject.getBoolean("cancelled", Boolean.FALSE).booleanValue();
            this.runs.put(name, run);
        }
    }

    public void stop(Promise<Void> promise) throws Exception {
        if (this.deployer != null) {
            this.deployer.close();
        }
        this.server.stop(promise);
    }

    private void tryProgressStatus(Run run, String str) {
        PhaseInstance.Status status = null;
        Iterator<AgentInfo> it = run.agents.iterator();
        while (it.hasNext()) {
            PhaseInstance.Status status2 = it.next().phases.get(str);
            if (status2 == null) {
                return;
            }
            if (status == null || status2.ordinal() < status.ordinal()) {
                status = status2;
            }
        }
        ControllerPhase controllerPhase = run.phases.get(str);
        if (controllerPhase == null) {
            log.error("Cannot find phase {} in run {}", new Object[]{str, run.id});
            return;
        }
        switch (AnonymousClass1.$SwitchMap$io$hyperfoil$api$session$PhaseInstance$Status[status.ordinal()]) {
            case 1:
                controllerPhase.status(run.id, ControllerPhase.Status.RUNNING);
                break;
            case 2:
                controllerPhase.status(run.id, ControllerPhase.Status.FINISHED);
                break;
            case 3:
                controllerPhase.status(run.id, ControllerPhase.Status.TERMINATED);
                controllerPhase.absoluteCompletionTime(System.currentTimeMillis());
                break;
        }
        if (controllerPhase.isFailed()) {
            failNotStartedPhases(run, controllerPhase);
        }
    }

    private void failNotStartedPhases(Run run, ControllerPhase controllerPhase) {
        log.info("Phase {} failed, cancelling other phases...", new Object[]{controllerPhase.definition().name()});
        for (ControllerPhase controllerPhase2 : run.phases.values()) {
            if (controllerPhase2.status() == ControllerPhase.Status.NOT_STARTED) {
                controllerPhase2.status(run.id, ControllerPhase.Status.CANCELLED);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Run createRun(Benchmark benchmark, String str) {
        String format = String.format("%04X", Integer.valueOf(this.runIds.getAndIncrement()));
        Path resolve = Controller.RUN_DIR.resolve(format);
        resolve.toFile().mkdirs();
        Run run = new Run(format, resolve, benchmark);
        run.description = str;
        run.statisticsStore = new StatisticsStore(run.benchmark, failure -> {
            log.warn("Failed verify SLA(s) for {}/{}: {}", new Object[]{failure.phase(), failure.metric(), failure.message()});
        });
        this.runs.put(run.id, run);
        PersistenceUtil.store(run.benchmark, run.dir);
        return run;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public String startBenchmark(Run run) {
        HashSet hashSet = new HashSet();
        Iterator<Run> it = this.runs.values().iterator();
        while (it.hasNext()) {
            if (!it.next().terminateTime.future().isComplete()) {
                Iterator<AgentInfo> it2 = run.agents.iterator();
                while (it2.hasNext()) {
                    hashSet.add(it2.next().name);
                }
            }
        }
        for (Agent agent : run.benchmark.agents()) {
            if (hashSet.contains(agent.name)) {
                long currentTimeMillis = System.currentTimeMillis();
                run.startTime = currentTimeMillis;
                run.terminateTime.complete(Long.valueOf(currentTimeMillis));
                run.completed = true;
                return "Agent " + agent + " is already used; try starting the benchmark later";
            }
        }
        if (run.benchmark.agents().length == 0) {
            if (this.vertx.isClustered()) {
                long currentTimeMillis2 = System.currentTimeMillis();
                run.startTime = currentTimeMillis2;
                run.terminateTime.complete(Long.valueOf(currentTimeMillis2));
                run.completed = true;
                return "Server is started in clustered mode; benchmarks must define agents.";
            }
            run.agents.add(new AgentInfo("in-vm", 0));
            this.vertx.deployVerticle(AgentVerticle.class, new DeploymentOptions().setConfig(new JsonObject().put("runId", run.id).put("name", "in-vm")));
        } else {
            if (!this.vertx.isClustered()) {
                return "Server is not started as clustered and does not accept benchmarks with agents defined.";
            }
            log.info("Starting agents for run {}", new Object[]{run.id});
            int i = 0;
            for (Agent agent2 : run.benchmark.agents()) {
                int i2 = i;
                i++;
                AgentInfo agentInfo = new AgentInfo(agent2.name, i2);
                run.agents.add(agentInfo);
                log.debug("Starting agent {}", new Object[]{agent2.name});
                this.vertx.executeBlocking(promise -> {
                    agentInfo.deployedAgent = this.deployer.start(agent2, run.id, run.benchmark, th -> {
                        run.errors.add(new Run.Error(agentInfo, new BenchmarkExecutionException("Failed to deploy agent", th)));
                        log.error("Failed to deploy agent {}", th, new Object[]{agent2.name});
                        this.vertx.runOnContext(r5 -> {
                            stopSimulation(run);
                        });
                    });
                }, false, asyncResult -> {
                    if (asyncResult.failed()) {
                        run.errors.add(new Run.Error(agentInfo, new BenchmarkExecutionException("Failed to start agent", asyncResult.cause())));
                        log.error("Failed to start agent {}", asyncResult.cause(), new Object[]{agent2.name});
                    }
                });
            }
        }
        run.deployTimerId = this.vertx.setTimer(Controller.DEPLOY_TIMEOUT, l -> {
            log.error("{} Deployment timed out.", new Object[]{run.id});
            run.errors.add(new Run.Error(null, new BenchmarkExecutionException("Deployment timed out.")));
            stopSimulation(run);
        });
        return null;
    }

    private void handleAgentsStarted(Run run) {
        this.vertx.cancelTimer(run.deployTimerId);
        log.info("Starting benchmark {} - run {}", new Object[]{run.benchmark.name(), run.id});
        for (AgentInfo agentInfo : run.agents) {
            if (agentInfo.status != AgentInfo.Status.REGISTERED) {
                log.error("{} Already initializing {}, status is {}!", new Object[]{run.id, agentInfo.deploymentId, agentInfo.status});
            } else {
                this.eb.request(agentInfo.deploymentId, new AgentControlMessage(AgentControlMessage.Command.INITIALIZE, agentInfo.id, run.benchmark), asyncResult -> {
                    if (asyncResult.succeeded()) {
                        return;
                    }
                    agentInfo.status = AgentInfo.Status.FAILED;
                    log.error("{} Agent {}({}) failed to initialize", asyncResult.cause(), new Object[]{run.id, agentInfo.name, agentInfo.deploymentId});
                    run.errors.add(new Run.Error(agentInfo, asyncResult.cause()));
                    stopSimulation(run);
                });
            }
        }
    }

    private void startSimulation(Run run) {
        this.vertx.executeBlocking(promise -> {
            List<RunHook> loadHooks = loadHooks("pre");
            loadHooks.addAll(run.benchmark.preHooks());
            Collections.sort(loadHooks);
            Iterator<RunHook> it = loadHooks.iterator();
            while (true) {
                if (!it.hasNext()) {
                    break;
                }
                RunHook next = it.next();
                StringBuilder sb = new StringBuilder();
                Map<String, String> runProperties = getRunProperties(run);
                Objects.requireNonNull(sb);
                boolean run2 = next.run(runProperties, sb::append);
                run.hookResults.add(new Run.RunHookOutput(next.name(), sb.toString()));
                if (!run2) {
                    run.errors.add(new Run.Error(null, new BenchmarkExecutionException("Execution of run hook " + next.name() + " failed.")));
                    promise.fail("Execution of pre-hook " + next.name() + " failed.");
                    break;
                }
            }
            promise.complete();
        }, asyncResult -> {
            if (asyncResult.succeeded()) {
                this.vertx.runOnContext(r8 -> {
                    if (!$assertionsDisabled && run.startTime != Long.MIN_VALUE) {
                        throw new AssertionError();
                    }
                    run.startTime = System.currentTimeMillis();
                    for (Phase phase : run.benchmark.phases()) {
                        run.phases.put(phase.name(), new ControllerPhase(phase));
                    }
                    runSimulation(run);
                });
            } else {
                log.error("{} Failed to start the simulation", new Object[]{run.id, asyncResult.cause()});
                stopSimulation(run);
            }
        });
    }

    private void runSimulation(Run run) {
        if (this.timerId >= 0) {
            this.vertx.cancelTimer(this.timerId);
            this.timerId = -1L;
        }
        long currentTimeMillis = System.currentTimeMillis();
        for (ControllerPhase controllerPhase : run.phases.values()) {
            if (controllerPhase.status() == ControllerPhase.Status.RUNNING && controllerPhase.absoluteStartTime() + controllerPhase.definition().duration() <= currentTimeMillis) {
                this.eb.publish(Feeds.CONTROL, new PhaseControlMessage(PhaseControlMessage.Command.FINISH, controllerPhase.definition().name));
                controllerPhase.status(run.id, ControllerPhase.Status.FINISHING);
            }
            if (controllerPhase.status() == ControllerPhase.Status.FINISHED) {
                if (controllerPhase.definition().maxDuration() < 0 || controllerPhase.absoluteStartTime() + controllerPhase.definition().maxDuration() > currentTimeMillis) {
                    Stream stream = controllerPhase.definition().terminateAfterStrict().stream();
                    Map<String, ControllerPhase> map = run.phases;
                    Objects.requireNonNull(map);
                    if (stream.map((v1) -> {
                        return r1.get(v1);
                    }).allMatch(controllerPhase2 -> {
                        return controllerPhase2.status().isTerminated();
                    })) {
                        this.eb.publish(Feeds.CONTROL, new PhaseControlMessage(PhaseControlMessage.Command.TRY_TERMINATE, controllerPhase.definition().name));
                    }
                } else {
                    this.eb.publish(Feeds.CONTROL, new PhaseControlMessage(PhaseControlMessage.Command.TERMINATE, controllerPhase.definition().name));
                    controllerPhase.status(run.id, ControllerPhase.Status.TERMINATING);
                }
            }
        }
        for (ControllerPhase controllerPhase3 : run.getAvailablePhases()) {
            this.eb.publish(Feeds.CONTROL, new PhaseControlMessage(PhaseControlMessage.Command.RUN, controllerPhase3.definition().name));
            controllerPhase3.absoluteStartTime(currentTimeMillis);
            controllerPhase3.status(run.id, ControllerPhase.Status.STARTING);
        }
        if (run.phases.values().stream().allMatch(controllerPhase4 -> {
            return controllerPhase4.status().isTerminated();
        })) {
            log.info("{} All phases are terminated.", new Object[]{run.id});
            stopSimulation(run);
            return;
        }
        long min = Math.min(run.nextTimestamp() - System.currentTimeMillis(), 1000L);
        log.debug("Wait {} ms", new Object[]{Long.valueOf(min)});
        if (min <= 0) {
            this.vertx.runOnContext(r5 -> {
                runSimulation(run);
            });
            return;
        }
        if (this.timerId >= 0) {
            this.vertx.cancelTimer(this.timerId);
        }
        this.timerId = this.vertx.setTimer(min, l -> {
            runSimulation(run);
        });
    }

    private void stopSimulation(Run run) {
        if (run.terminateTime.future().isComplete()) {
            log.warn("Run {} already completed.", new Object[]{run.id});
            return;
        }
        run.terminateTime.complete(Long.valueOf(System.currentTimeMillis()));
        run.completed = true;
        for (AgentInfo agentInfo : run.agents) {
            if (agentInfo.deploymentId != null) {
                this.eb.request(agentInfo.deploymentId, new AgentControlMessage(AgentControlMessage.Command.STOP, agentInfo.id, null), asyncResult -> {
                    if (asyncResult.succeeded()) {
                        agentInfo.status = AgentInfo.Status.STOPPED;
                        checkAgentsStopped(run);
                        log.debug("Agent {}/{} stopped.", new Object[]{agentInfo.name, agentInfo.deploymentId});
                    } else {
                        agentInfo.status = AgentInfo.Status.FAILED;
                        log.error("Agent {}/{} failed to stop", asyncResult.cause(), new Object[]{agentInfo.name, agentInfo.deploymentId});
                    }
                    if (agentInfo.deployedAgent != null) {
                        this.vertx.setTimer(3000L, l -> {
                            agentInfo.deployedAgent.stop();
                        });
                    }
                });
            } else {
                if (!$assertionsDisabled && agentInfo.status != AgentInfo.Status.STARTING) {
                    throw new AssertionError();
                }
                if (agentInfo.deployedAgent != null) {
                    agentInfo.deployedAgent.stop();
                }
            }
        }
        checkAgentsStopped(run);
    }

    private void checkAgentsStopped(Run run) {
        if (run.agents.stream().allMatch(agentInfo -> {
            return agentInfo.status.ordinal() >= AgentInfo.Status.STOPPED.ordinal();
        })) {
            for (ControllerPhase controllerPhase : run.phases.values()) {
                run.statisticsStore.adjustPhaseTimestamps(controllerPhase.definition().name(), controllerPhase.absoluteStartTime(), controllerPhase.absoluteCompletionTime());
            }
            persistRun(run);
            log.info("Run {} completed", new Object[]{run.id});
        }
    }

    private void persistRun(Run run) {
        this.vertx.executeBlocking(promise -> {
            FileOutputStream fileOutputStream;
            try {
                CsvWriter.writeCsv(run.dir.resolve("stats"), run.statisticsStore);
            } catch (IOException e) {
                log.error("Failed to persist statistics", e);
                promise.fail(e);
            }
            JsonObject put = new JsonObject().put("id", run.id).put("benchmark", run.benchmark.name()).put("startTime", Long.valueOf(run.startTime)).put("terminateTime", (Long) run.terminateTime.future().result()).put("cancelled", Boolean.valueOf(run.cancelled)).put("description", run.description).put("errors", new JsonArray((List) run.errors.stream().map(error -> {
                JsonObject jsonObject = new JsonObject();
                if (error.agent != null) {
                    jsonObject.put("agent", error.agent.name);
                }
                return jsonObject.put("msg", error.error.getMessage());
            }).collect(Collectors.toList())));
            try {
                Files.write(run.dir.resolve("info.json"), put.encodePrettily().getBytes(StandardCharsets.UTF_8), new OpenOption[0]);
            } catch (IOException e2) {
                log.error("Cannot write info file", e2);
                promise.fail(e2);
            }
            try {
                fileOutputStream = new FileOutputStream(run.dir.resolve("all.json").toFile());
            } catch (IOException e3) {
                log.error("Cannot write all.json file", e3);
                promise.fail(e3);
            }
            try {
                JsonFactory jsonFactory = new JsonFactory();
                jsonFactory.setCodec(new ObjectMapper());
                JsonGenerator createGenerator = jsonFactory.createGenerator(fileOutputStream, JsonEncoding.UTF8);
                createGenerator.setCodec(new ObjectMapper());
                JsonWriter.writeArrayJsons(run.statisticsStore, createGenerator, put);
                createGenerator.flush();
                createGenerator.close();
                fileOutputStream.close();
                List<RunHook> loadHooks = loadHooks("post");
                loadHooks.addAll(run.benchmark.postHooks());
                Collections.sort(loadHooks);
                Iterator<RunHook> it = loadHooks.iterator();
                while (true) {
                    if (!it.hasNext()) {
                        break;
                    }
                    RunHook next = it.next();
                    StringBuilder sb = new StringBuilder();
                    Map<String, String> runProperties = getRunProperties(run);
                    Objects.requireNonNull(sb);
                    boolean run2 = next.run(runProperties, sb::append);
                    run.hookResults.add(new Run.RunHookOutput(next.name(), sb.toString()));
                    if (!run2) {
                        log.error("Execution of post-hook " + next.name() + " failed.");
                        break;
                    }
                }
                try {
                    Files.write(run.dir.resolve("hooks.json"), new JsonArray((List) run.hookResults.stream().map(runHookOutput -> {
                        return new JsonObject().put("name", runHookOutput.name).put("output", runHookOutput.output);
                    }).collect(Collectors.toList())).encodePrettily().getBytes(StandardCharsets.UTF_8), new OpenOption[0]);
                } catch (IOException e4) {
                    log.error("Cannot write hook results", e4);
                    promise.fail(e4);
                }
                promise.tryComplete();
            } finally {
            }
        }, asyncResult -> {
            run.completed = true;
            if (asyncResult.failed()) {
                log.error("Failed to persist run {}", asyncResult.cause(), new Object[]{run.id});
            } else {
                log.info("Successfully persisted run {}", new Object[]{run.id});
            }
        });
    }

    private Map<String, String> getRunProperties(Run run) {
        HashMap hashMap = new HashMap();
        hashMap.put("RUN_ID", run.id);
        hashMap.put("RUN_DIR", Controller.RUN_DIR.resolve(run.id).toAbsolutePath().toString());
        if (run.description != null) {
            hashMap.put("RUN_DESCRIPTION", run.description);
        }
        hashMap.put("BENCHMARK", run.benchmark.name());
        File file = Controller.BENCHMARK_DIR.resolve(run.benchmark.name() + ".yaml").toFile();
        if (file.exists()) {
            hashMap.put("BENCHMARK_PATH", file.getAbsolutePath());
        }
        return hashMap;
    }

    public Run run(String str) {
        return this.runs.get(str);
    }

    public Collection<Run> runs() {
        return this.runs.values();
    }

    public void kill(Run run, Handler<AsyncResult<Void>> handler) {
        log.info("{} Killing run", new Object[]{run.id});
        try {
            run.cancelled = true;
            for (Map.Entry<String, ControllerPhase> entry : run.phases.entrySet()) {
                ControllerPhase.Status status = entry.getValue().status();
                if (!status.isTerminated()) {
                    if (status == ControllerPhase.Status.NOT_STARTED) {
                        entry.getValue().status(run.id, ControllerPhase.Status.CANCELLED);
                    } else {
                        entry.getValue().status(run.id, ControllerPhase.Status.TERMINATING);
                        this.eb.publish(Feeds.CONTROL, new PhaseControlMessage(PhaseControlMessage.Command.TERMINATE, entry.getKey()));
                    }
                }
            }
            run.terminateTime.future().onComplete(asyncResult -> {
                handler.handle(asyncResult.mapEmpty());
            });
        } catch (Throwable th) {
            handler.handle(Future.failedFuture(th));
        }
    }

    public boolean addBenchmark(Benchmark benchmark, String str, Handler<AsyncResult<Void>> handler) {
        Benchmark benchmark2;
        if (str == null || ((benchmark2 = this.benchmarks.get(benchmark.name())) != null && str.equals(benchmark2.version()))) {
            this.benchmarks.put(benchmark.name(), benchmark);
            this.vertx.executeBlocking(promise -> {
                PersistenceUtil.store(benchmark, Controller.BENCHMARK_DIR);
                promise.complete();
            }, handler);
            return true;
        }
        Logger logger = log;
        Object[] objArr = new Object[3];
        objArr[0] = benchmark.name();
        objArr[1] = str;
        objArr[2] = benchmark2 != null ? benchmark2.version() : "<non-existent>";
        logger.info("Updating benchmark {}, version {} but current version is {}", objArr);
        return false;
    }

    public Collection<String> getBenchmarks() {
        return this.benchmarks.keySet();
    }

    public Benchmark getBenchmark(String str) {
        return this.benchmarks.get(str);
    }

    private void loadBenchmarks(Handler<AsyncResult<Void>> handler) {
        this.vertx.executeBlocking(promise -> {
            try {
                Files.list(Controller.BENCHMARK_DIR).forEach(path -> {
                    try {
                        Benchmark load = PersistenceUtil.load(path);
                        if (load != null) {
                            this.benchmarks.put(load.name(), load);
                        }
                    } catch (Exception e) {
                        log.error("Failed to load a benchmark from {}", e, new Object[]{path});
                    }
                });
            } catch (IOException e) {
                log.error("Failed to list benchmark dir {}", e, new Object[]{Controller.BENCHMARK_DIR});
            }
            promise.complete();
        }, handler);
    }

    private List<RunHook> loadHooks(String str) {
        try {
            File file = Controller.HOOKS_DIR.resolve(str).toFile();
            if (file.exists() && file.isDirectory()) {
                return (List) Files.list(file.toPath()).map((v0) -> {
                    return v0.toFile();
                }).filter(file2 -> {
                    return (file2.isDirectory() || file2.isHidden()) ? false : true;
                }).map(file3 -> {
                    return new ExecRunHook(file3.getName(), file3.getAbsolutePath());
                }).collect(Collectors.toList());
            }
        } catch (IOException e) {
            log.error("Failed to list hooks.", e);
        }
        return Collections.emptyList();
    }

    public void listSessions(Run run, boolean z, BiConsumer<AgentInfo, String> biConsumer, Handler<AsyncResult<Void>> handler) {
        invokeOnAgents(run, AgentControlMessage.Command.LIST_SESSIONS, Boolean.valueOf(z), handler, (agentInfo, asyncResult) -> {
            Iterator it = ((List) ((Message) asyncResult.result()).body()).iterator();
            while (it.hasNext()) {
                biConsumer.accept(agentInfo, (String) it.next());
            }
        });
    }

    public void listConnections(Run run, BiConsumer<AgentInfo, String> biConsumer, Handler<AsyncResult<Void>> handler) {
        invokeOnAgents(run, AgentControlMessage.Command.LIST_CONNECTIONS, null, handler, (agentInfo, asyncResult) -> {
            Iterator it = ((List) ((Message) asyncResult.result()).body()).iterator();
            while (it.hasNext()) {
                biConsumer.accept(agentInfo, (String) it.next());
            }
        });
    }

    private void invokeOnAgents(Run run, AgentControlMessage.Command command, Object obj, Handler<AsyncResult<Void>> handler, BiConsumer<AgentInfo, AsyncResult<Message<Object>>> biConsumer) {
        AtomicInteger atomicInteger = new AtomicInteger(1);
        for (AgentInfo agentInfo : run.agents) {
            if (agentInfo.status.ordinal() >= AgentInfo.Status.STOPPED.ordinal()) {
                log.debug("Cannot invoke command on {}, status: {}", new Object[]{agentInfo.name, agentInfo.status});
            } else {
                atomicInteger.incrementAndGet();
                this.eb.request(agentInfo.deploymentId, new AgentControlMessage(command, agentInfo.id, obj), asyncResult -> {
                    if (asyncResult.failed()) {
                        log.error("Failed to connect to agent {}", asyncResult.cause(), new Object[]{agentInfo.name});
                        handler.handle(Future.failedFuture(asyncResult.cause()));
                    } else {
                        biConsumer.accept(agentInfo, asyncResult);
                        if (atomicInteger.decrementAndGet() == 0) {
                            handler.handle(Future.succeededFuture());
                        }
                    }
                });
            }
        }
        if (atomicInteger.decrementAndGet() == 0) {
            handler.handle(Future.succeededFuture());
        }
    }

    public boolean hasControllerLog() {
        return this.deployer != null && this.deployer.hasControllerLog();
    }

    public void downloadControllerLog(long j, File file, Handler<AsyncResult<Void>> handler) {
        this.vertx.executeBlocking(promise -> {
            this.deployer.downloadControllerLog(j, file.toString(), handler);
        }, asyncResult -> {
            if (asyncResult.failed()) {
                handler.handle(Future.failedFuture(asyncResult.cause()));
            }
        });
    }

    public void downloadAgentLog(DeployedAgent deployedAgent, long j, File file, Handler<AsyncResult<Void>> handler) {
        this.vertx.executeBlocking(promise -> {
            this.deployer.downloadAgentLog(deployedAgent, j, file.toString(), handler);
        }, asyncResult -> {
            if (asyncResult.failed()) {
                handler.handle(Future.failedFuture(asyncResult.cause()));
            }
        });
    }

    public Benchmark ensureBenchmark(Run run) {
        if (run.benchmark.source() == null) {
            File file = Controller.RUN_DIR.resolve(run.id).resolve(run.benchmark.name() + ".serialized").toFile();
            if (file.exists() && file.isFile()) {
                run.benchmark = PersistenceUtil.load(file.toPath());
                return run.benchmark;
            }
            File file2 = Controller.RUN_DIR.resolve(run.id).resolve(run.benchmark.name() + ".yaml").toFile();
            if (file2.exists() && file2.isFile()) {
                run.benchmark = PersistenceUtil.load(file2.toPath());
                return run.benchmark;
            }
            log.warn("Cannot find benchmark source for run " + run.id + ", benchmark " + run.benchmark.name());
        }
        return run.benchmark;
    }

    public void shutdown() {
        InfinispanClusterManager clusterManager = this.vertx.getClusterManager();
        if (clusterManager == null) {
            this.vertx.close();
        } else {
            BasicCacheContainer cacheContainer = clusterManager.getCacheContainer();
            this.vertx.close(asyncResult -> {
                cacheContainer.stop();
            });
        }
    }

    public int actualPort() {
        return this.server.httpServer.actualPort();
    }

    public Path getRunDir(Run run) {
        return Controller.RUN_DIR.resolve(run.id);
    }

    public JsonObject getConfig() {
        return this.context.config();
    }

    static {
        $assertionsDisabled = !ControllerVerticle.class.desiredAssertionStatus();
        log = LoggerFactory.getLogger(ControllerVerticle.class);
    }
}
