package org.flinkextended.flink.ml.operator.client;

import java.net.URL;
import java.util.concurrent.ExecutionException;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableDescriptor;
import org.apache.flink.table.api.bridge.java.StreamStatementSet;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.flinkextended.flink.ml.cluster.ClusterConfig;
import org.flinkextended.flink.ml.operator.coding.RowCSVCoding;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/flinkextended/flink/ml/operator/client/NodeUtilsTest.class */
public class NodeUtilsTest {
    private StreamTableEnvironment tEnv;
    private StreamStatementSet statementSet;
    private StreamExecutionEnvironment env;

    @Before
    public void setUp() throws Exception {
        this.env = StreamExecutionEnvironment.getExecutionEnvironment();
        this.tEnv = StreamTableEnvironment.create(this.env);
        this.statementSet = this.tEnv.createStatementSet();
    }

    @Test
    public void testScheduleNodeNoInputNoOutput() throws InterruptedException, ExecutionException {
        ClusterConfig build = ClusterConfig.newBuilder().addNodeType("worker", 2).setNodeEntry(getScriptPathFromResources("greeter.py"), "map_func").build();
        NodeUtils.scheduleAMNode(this.statementSet, build);
        NodeUtils.scheduleNodes(this.statementSet, build, "worker");
        this.statementSet.execute().await();
    }

    @Test
    public void testScheduleNodeWithTwoNodeTypes() throws InterruptedException, ExecutionException {
        ClusterConfig build = ClusterConfig.newBuilder().addNodeType("worker", 2).addNodeType("ps", 2).setNodeEntry(getScriptPathFromResources("greeter.py"), "map_func").build();
        NodeUtils.scheduleAMNode(this.statementSet, build);
        NodeUtils.scheduleNodes(this.statementSet, build, "worker");
        NodeUtils.scheduleNodes(this.statementSet, build, "ps");
        this.statementSet.execute().await();
    }

    @Test
    public void testScheduleNodeNoInputWithOutput() throws ExecutionException, InterruptedException {
        ClusterConfig build = ClusterConfig.newBuilder().addNodeType("worker", 2).setNodeEntry(getScriptPathFromResources("row_output.py"), "map_func").setProperty("sys:decoding_class", RowCSVCoding.class.getName()).setProperty("output_types", "STRING,INT_32").build();
        Schema build2 = Schema.newBuilder().column("name", DataTypes.STRING()).column("value", DataTypes.INT()).build();
        NodeUtils.scheduleAMNode(this.statementSet, build);
        this.statementSet.addInsert(TableDescriptor.forConnector("print").build(), NodeUtils.scheduleNodes(this.tEnv, build, build2, "worker"));
        this.statementSet.execute().await();
    }

    @Test
    public void testScheduleNodeWithInputNoOutput() throws ExecutionException, InterruptedException {
        Table fromDataStream = this.tEnv.fromDataStream(this.env.fromElements(new Tuple2[]{new Tuple2("1", 1), new Tuple2("2", 2), new Tuple2("3", 3), new Tuple2("4", 4)}));
        ClusterConfig build = ClusterConfig.newBuilder().addNodeType("worker", 2).setNodeEntry(getScriptPathFromResources("row_input.py"), "map_func").setProperty("sys:encoding_class", RowCSVCoding.class.getName()).setProperty("input_types", "STRING,INT_32").build();
        NodeUtils.scheduleAMNode(this.statementSet, build);
        NodeUtils.scheduleNodes(this.statementSet, fromDataStream, build, "worker");
        this.statementSet.execute().await();
    }

    @Test
    public void testScheduleNodeWithInputWithOutput() throws ExecutionException, InterruptedException {
        Table fromDataStream = this.tEnv.fromDataStream(this.env.fromElements(new Tuple2[]{new Tuple2("1", 1), new Tuple2("2", 2), new Tuple2("3", 3), new Tuple2("4", 4)}));
        ClusterConfig build = ClusterConfig.newBuilder().addNodeType("worker", 2).setNodeEntry(getScriptPathFromResources("row_input_output.py"), "map_func").setProperty("sys:encoding_class", RowCSVCoding.class.getName()).setProperty("sys:decoding_class", RowCSVCoding.class.getName()).setProperty("input_types", "STRING,INT_32").setProperty("output_types", "STRING,FLOAT_64").build();
        Schema build2 = Schema.newBuilder().column("key", DataTypes.STRING()).column("value", DataTypes.DOUBLE()).build();
        NodeUtils.scheduleAMNode(this.statementSet, build);
        this.statementSet.addInsert(TableDescriptor.forConnector("print").build(), NodeUtils.scheduleNodes(fromDataStream, build, build2, "worker"));
        this.statementSet.execute().await();
    }

    private String getScriptPathFromResources(String str) {
        URL resource = Thread.currentThread().getContextClassLoader().getResource(str);
        Assert.assertNotNull(resource);
        return resource.getPath();
    }
}
