package org.flinkextended.flink.ml.examples.tensorflow.it;

import com.google.common.base.Preconditions;
import com.google.common.io.Files;
import java.io.File;
import java.io.IOException;
import org.flinkextended.flink.ml.examples.tensorflow.mnist.MnistDataUtil;
import org.flinkextended.flink.ml.examples.tensorflow.mnist.MnistDist;
import org.flinkextended.flink.ml.examples.util.CodeUtil;
import org.flinkextended.flink.ml.util.MiniCluster;
import org.flinkextended.flink.ml.util.SysUtil;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;

/* loaded from: input_file:org/flinkextended/flink/ml/examples/tensorflow/it/MnistIT.class */
public class MnistIT {
    private static final String MNIST_FOLDER = "mnist_input";
    public static final String MNIST_CONTAINER_PATH = "/tmp/mnist_input";
    private static MiniCluster miniCluster;

    @BeforeClass
    public static void setUp() throws Exception {
        miniCluster = MiniCluster.start(3);
        miniCluster.setExecJar("/dl-on-flink-examples/target/dl-on-flink-examples-" + SysUtil.getProjectVersion() + ".jar");
        Preconditions.checkState(miniCluster.copyToJM(MnistDataUtil.downloadData(), MNIST_CONTAINER_PATH));
    }

    @AfterClass
    public static void tearDown() throws Exception {
        if (miniCluster != null) {
            miniCluster.stop();
        }
    }

    @After
    public void clearLogs() {
        miniCluster.emptyTMLogs();
    }

    @Test
    public void mnistDistStream() throws IOException {
        runAndVerify(miniCluster, MnistDist.EnvMode.StreamEnv);
    }

    @Test
    public void mnistDistStreamTable() throws IOException {
        runAndVerify(miniCluster, MnistDist.EnvMode.StreamTableEnv);
    }

    private void runAndVerify(MiniCluster miniCluster2, MnistDist.EnvMode envMode) throws IOException {
        runAndVerify(miniCluster2, envMode, false);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static boolean runAndVerify(MiniCluster miniCluster2, MnistDist.EnvMode envMode, boolean z) throws IOException {
        String run = run(miniCluster2, envMode, z);
        System.out.println(run);
        if (run.contains("Program execution finished")) {
            File createTempDir = Files.createTempDir();
            miniCluster2.dumpFlinkLogs(createTempDir);
            System.out.println("logs in " + createTempDir.getAbsolutePath());
            return true;
        }
        File createTempDir2 = Files.createTempDir();
        miniCluster2.dumpFlinkLogs(createTempDir2);
        Assert.fail("MnistDist failed in mode " + envMode + ", check logs in " + createTempDir2.getAbsolutePath());
        return false;
    }

    private static String run(MiniCluster miniCluster2, MnistDist.EnvMode envMode, boolean z) throws IOException {
        try {
            return miniCluster2.flinkRun(MnistDist.class.getCanonicalName(), new String[]{"--zk-conn-str", miniCluster2.getZKContainer(), "--mode", envMode.toString(), "--setup", MiniCluster.getLocalBuildDir() + "/dl-on-flink-examples/src/test/python/mnist_data_setup.py", "--train", "mnist_dist.py", "--envpath", miniCluster2.getVenvHdfsPath(), "--mnist-files", MNIST_CONTAINER_PATH, "--with-restart", String.valueOf(z), "--code-path", CodeUtil.copyCodeToHdfs(miniCluster2)});
        } catch (IOException e) {
            e.printStackTrace();
            throw e;
        }
    }
}
