package org.flinkextended.flink.ml.tensorflow.client;

import java.time.Duration;
import java.util.Map;
import org.apache.curator.test.TestingServer;
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.flinkextended.flink.ml.tensorflow.storage.DummyStorage;
import org.flinkextended.flink.ml.util.SysUtil;
import org.flinkextended.flink.ml.util.TestUtil;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/flinkextended/flink/ml/tensorflow/client/RunWithFailTest.class */
public class RunWithFailTest {
    private static Logger LOG = LoggerFactory.getLogger(RunWithFailTest.class);
    private static TestingServer server;
    private static final String simple_print = "simple_print.py";
    private static final String failover = "failover.py";
    private static final String failover2 = "failover2.py";

    @Rule
    public ExpectedException expectedException = ExpectedException.none();

    @Before
    public void setUp() throws Exception {
        server = new TestingServer(2181, true);
    }

    @After
    public void tearDown() throws Exception {
        server.stop();
    }

    private TFConfig buildTFConfig(String str) {
        return buildTFConfig(str, String.valueOf(System.currentTimeMillis()), 2, 1);
    }

    private TFConfig buildTFConfig(String str, String str2, int i, int i2) {
        System.out.println("buildTFConfig: " + SysUtil._FUNC_());
        System.out.println("Current version:" + str2);
        return new TFConfig(i, i2, (Map) null, new String[]{scriptAbsolutePath(str)}, "map_func", (String) null);
    }

    @Test
    public void SimpleStartupTest() throws Exception {
        TFConfig buildTFConfig = buildTFConfig(simple_print, "1", 1, 1);
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        TFUtils.train(executionEnvironment, buildTFConfig);
        System.out.println(executionEnvironment.execute().getNetRuntime());
    }

    @Test
    public void WorkerFailoverTest() throws Exception {
        LOG.info("############ Start failover test.");
        TFConfig buildTFConfig = buildTFConfig(failover, String.valueOf(System.currentTimeMillis()), 2, 1);
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        TFUtils.train(executionEnvironment, (DataStream) null, buildTFConfig);
        System.out.println(executionEnvironment.execute().getNetRuntime());
        LOG.info("############# Finish failover test.");
    }

    @Test
    public void testFailoverWithFinishedNode() throws Exception {
        TFConfig buildTFConfig = buildTFConfig(failover2, String.valueOf(System.currentTimeMillis()), 2, 1);
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        TFUtils.train(executionEnvironment, (DataStream) null, buildTFConfig);
        System.out.println(executionEnvironment.execute().getNetRuntime());
    }

    @Test
    public void testJobTimeout() throws Exception {
        TFConfig buildTFConfig = buildTFConfig(simple_print);
        buildTFConfig.setPsNum(0);
        buildTFConfig.setWorkerNum(1);
        buildTFConfig.getProperties().put("storage_type", "storage_custom");
        buildTFConfig.getProperties().put("storage_impl_class", DummyStorage.class.getCanonicalName());
        buildTFConfig.getProperties().put("am.registry.timeout", String.valueOf(Duration.ofSeconds(10L).toMillis()));
        buildTFConfig.getProperties().put("node.idle.timeout", String.valueOf(Duration.ofSeconds(10L).toMillis()));
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        TFUtils.train(executionEnvironment, (DataStream) null, buildTFConfig);
        this.expectedException.expect(JobExecutionException.class);
        this.expectedException.expectMessage("Job execution failed");
        executionEnvironment.execute();
    }

    private static String scriptAbsolutePath(String str) {
        return TestUtil.getProjectRootPath() + "/dl-on-flink-tensorflow-2.x/src/test/python/" + str;
    }
}
