package org.apache.kafka.trogdor.workload;

import com.fasterxml.jackson.databind.node.IntNode;
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.fasterxml.jackson.databind.node.TextNode;
import java.io.File;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.OpenOption;
import java.util.Arrays;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import org.apache.kafka.common.internals.KafkaFutureImpl;
import org.apache.kafka.common.utils.OperatingSystem;
import org.apache.kafka.test.TestUtils;
import org.apache.kafka.trogdor.common.Platform;
import org.apache.kafka.trogdor.task.AgentWorkerStatusTracker;
import org.apache.kafka.trogdor.task.WorkerStatusTracker;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;

@Timeout(120)
/* loaded from: input_file:org/apache/kafka/trogdor/workload/ExternalCommandWorkerTest.class */
public class ExternalCommandWorkerTest {

    /* loaded from: input_file:org/apache/kafka/trogdor/workload/ExternalCommandWorkerTest$ExternalCommandWorkerBuilder.class */
    static class ExternalCommandWorkerBuilder {
        private final String id;
        private int shutdownGracePeriodMs = 3000000;
        private String[] command = new String[0];
        private final ObjectNode workload = new ObjectNode(JsonNodeFactory.instance);

        ExternalCommandWorkerBuilder(String str) {
            this.id = str;
            this.workload.set("foo", new TextNode("value1"));
            this.workload.set("bar", new IntNode(123));
        }

        ExternalCommandWorker build() {
            return new ExternalCommandWorker(this.id, new ExternalCommandSpec(0L, 30000L, "node0", Arrays.asList(this.command), this.workload, Optional.of(Integer.valueOf(this.shutdownGracePeriodMs))));
        }

        ExternalCommandWorkerBuilder command(String... strArr) {
            this.command = strArr;
            return this;
        }

        ExternalCommandWorkerBuilder shutdownGracePeriodMs(int i) {
            this.shutdownGracePeriodMs = i;
            return this;
        }
    }

    @Test
    public void testProcessWithNormalExit() throws Exception {
        if (OperatingSystem.IS_WINDOWS) {
            return;
        }
        ExternalCommandWorker build = new ExternalCommandWorkerBuilder("trueTask").command("true").build();
        KafkaFutureImpl kafkaFutureImpl = new KafkaFutureImpl();
        build.start((Platform) null, new AgentWorkerStatusTracker(), kafkaFutureImpl);
        Assertions.assertEquals("", kafkaFutureImpl.get());
        build.stop((Platform) null);
    }

    @Test
    public void testProcessWithFailedExit() throws Exception {
        if (OperatingSystem.IS_WINDOWS) {
            return;
        }
        ExternalCommandWorker build = new ExternalCommandWorkerBuilder("falseTask").command("false").build();
        KafkaFutureImpl kafkaFutureImpl = new KafkaFutureImpl();
        build.start((Platform) null, new AgentWorkerStatusTracker(), kafkaFutureImpl);
        Assertions.assertEquals("exited with return code 1", kafkaFutureImpl.get());
        build.stop((Platform) null);
    }

    @Test
    public void testProcessNotFound() throws Exception {
        ExternalCommandWorker build = new ExternalCommandWorkerBuilder("notFoundTask").command("/dev/null/non/existent/script/path").build();
        KafkaFutureImpl kafkaFutureImpl = new KafkaFutureImpl();
        build.start((Platform) null, new AgentWorkerStatusTracker(), kafkaFutureImpl);
        Assertions.assertTrue(((String) kafkaFutureImpl.get()).startsWith("Unable to start process"));
        build.stop((Platform) null);
    }

    @Test
    public void testProcessStop() throws Exception {
        if (OperatingSystem.IS_WINDOWS) {
            return;
        }
        ExternalCommandWorker build = new ExternalCommandWorkerBuilder("testStopTask").command("sleep", "3600000").build();
        KafkaFutureImpl kafkaFutureImpl = new KafkaFutureImpl();
        build.start((Platform) null, new AgentWorkerStatusTracker(), kafkaFutureImpl);
        build.stop((Platform) null);
        Assertions.assertTrue(((String) kafkaFutureImpl.get()).startsWith("exited with return code "));
    }

    @Test
    public void testProcessForceKillTimeout() throws Exception {
        if (OperatingSystem.IS_WINDOWS) {
            return;
        }
        File file = null;
        try {
            file = TestUtils.tempFile();
            OutputStream newOutputStream = Files.newOutputStream(file.toPath(), new OpenOption[0]);
            Throwable th = null;
            try {
                try {
                    for (String str : new String[]{"echo hello world\n", "# Test that the initial message is sent correctly.\n", "read -r line\n", "[[ $line == '{\"id\":\"testForceKillTask\",\"workload\":{\"foo\":\"value1\",\"bar\":123}}' ]] || exit 0\n", "\n", "# Ignore SIGTERM signals.  This ensures that we test SIGKILL delivery.\n", "trap 'echo SIGTERM' SIGTERM\n", "\n", "# Update the process status.  This will also unblock the junit test.\n", "# It is important that we do this after we disabled SIGTERM, to ensure\n", "# that we are testing SIGKILL.\n", "echo '{\"status\": \"green\", \"log\": \"my log message.\"}'\n", "\n", "# Wait for the SIGKILL.\n", "while true; do sleep 0.01; done\n"}) {
                        newOutputStream.write(str.getBytes(StandardCharsets.UTF_8));
                    }
                    if (newOutputStream != null) {
                        if (0 != 0) {
                            try {
                                newOutputStream.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            newOutputStream.close();
                        }
                    }
                    CompletableFuture completableFuture = new CompletableFuture();
                    WorkerStatusTracker workerStatusTracker = jsonNode -> {
                        completableFuture.complete(jsonNode.textValue());
                    };
                    ExternalCommandWorker build = new ExternalCommandWorkerBuilder("testForceKillTask").shutdownGracePeriodMs(1).command("bash", file.getAbsolutePath()).build();
                    KafkaFutureImpl kafkaFutureImpl = new KafkaFutureImpl();
                    build.start((Platform) null, workerStatusTracker, kafkaFutureImpl);
                    Assertions.assertEquals("green", completableFuture.get());
                    build.stop((Platform) null);
                    Assertions.assertTrue(((String) kafkaFutureImpl.get()).startsWith("exited with return code "));
                    if (file != null) {
                        Files.delete(file.toPath());
                    }
                } catch (Throwable th3) {
                    th = th3;
                    throw th3;
                }
            } finally {
            }
        } catch (Throwable th4) {
            if (file != null) {
                Files.delete(file.toPath());
            }
            throw th4;
        }
    }
}
