package org.flinkextended.flink.ml.examples.tensorflow.it;

import com.google.common.io.Files;
import java.io.File;
import java.util.concurrent.FutureTask;
import org.apache.hadoop.fs.Path;
import org.flinkextended.flink.ml.examples.tensorflow.mnist.MnistDataUtil;
import org.flinkextended.flink.ml.examples.tensorflow.mnist.MnistJavaInference;
import org.flinkextended.flink.ml.examples.tensorflow.ut.TFMnistInferenceTest;
import org.flinkextended.flink.ml.util.MiniCluster;
import org.flinkextended.flink.ml.util.SysUtil;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;

/* loaded from: input_file:org/flinkextended/flink/ml/examples/tensorflow/it/JavaInferenceFailoverIT.class */
public class JavaInferenceFailoverIT {
    private static final int NUM_TM = 3;
    private static final String HDFS_EXPORT_DIR = "/mnist/models/" + new Path(TFMnistInferenceTest.exportPath).getName();
    private static final String HDFS_TEST_DATA_DIR = "/mnist/test";
    private MiniCluster miniCluster;

    @BeforeClass
    public static void prepData() throws Exception {
        MnistDataUtil.prepareData();
        TFMnistInferenceTest.generateModelIfNeeded();
    }

    @Before
    public void setUp() throws Exception {
        this.miniCluster = MiniCluster.start(NUM_TM);
        this.miniCluster.setExecJar("/dl-on-flink-examples/target/dl-on-flink-examples-" + SysUtil.getProjectVersion() + ".jar");
        this.miniCluster.copyFromHostToHDFS(TFMnistInferenceTest.exportPath, HDFS_EXPORT_DIR);
        this.miniCluster.copyFromHostToHDFS(TFMnistInferenceTest.testDataPath, HDFS_TEST_DATA_DIR);
    }

    @After
    public void tearDown() throws Exception {
        if (this.miniCluster != null) {
            this.miniCluster.stop();
        }
    }

    @Test
    public void testKillOneTM() throws Exception {
        FutureTask futureTask = new FutureTask(this::runAndVerify, null);
        Thread thread = new Thread(futureTask);
        thread.setDaemon(true);
        thread.setName(getClass().getSimpleName() + "-JobRunner");
        thread.start();
        Thread.sleep(30000L);
        this.miniCluster.killOneTMWithWorkload();
        futureTask.get();
    }

    private void runAndVerify() {
        String flinkRun = this.miniCluster.flinkRun(MnistJavaInference.class.getCanonicalName(), new String[]{"--model-path", makeHDFSURI(HDFS_EXPORT_DIR), "--test-data", makeHDFSURI(HDFS_TEST_DATA_DIR), "--hadoop-fs", this.miniCluster.getHDFS(), "--num-records", "10000", "--batch-size", "200"});
        System.out.println(flinkRun);
        if (flinkRun.contains("Program execution finished")) {
            return;
        }
        File createTempDir = Files.createTempDir();
        this.miniCluster.dumpFlinkLogs(createTempDir);
        Assert.fail("Job failed, check logs in " + createTempDir.getAbsolutePath());
    }

    private String makeHDFSURI(String str) {
        return this.miniCluster.getHDFS() + str;
    }
}
