package org.flinkextended.flink.ml.operator.client;

import com.alibaba.fastjson.JSONObject;
import org.apache.curator.test.TestingServer;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.StatementSet;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableDescriptor;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.types.DataType;
import org.flinkextended.flink.ml.cluster.ExecutionMode;
import org.flinkextended.flink.ml.cluster.MLConfig;
import org.flinkextended.flink.ml.cluster.role.WorkerRole;
import org.flinkextended.flink.ml.operator.coding.RowCSVCoding;
import org.flinkextended.flink.ml.operator.sink.DebugJsonSink;
import org.flinkextended.flink.ml.operator.source.DebugJsonSource;
import org.flinkextended.flink.ml.operator.source.DebugRowSource;
import org.flinkextended.flink.ml.operator.util.DataTypes;
import org.flinkextended.flink.ml.operator.util.PythonFileUtil;
import org.flinkextended.flink.ml.operator.util.TypeUtil;
import org.flinkextended.flink.ml.util.DummyContext;
import org.flinkextended.flink.ml.util.SysUtil;
import org.flinkextended.flink.ml.util.TestUtil;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/flinkextended/flink/ml/operator/client/RoleUtilsTest.class */
public class RoleUtilsTest {
    private static TestingServer testingServer;
    private static String rootPath = TestUtil.getProjectRootPath() + "/dl-on-flink-operator/src/test/python/";
    private static final Logger LOG = LoggerFactory.getLogger(RoleUtilsTest.class);

    @Before
    public void setUp() throws Exception {
        testingServer = new TestingServer(2181, true);
    }

    @After
    public void tearDown() throws Exception {
        testingServer.stop();
    }

    @Test
    public void greeterJob() throws Exception {
        LOG.info("RUN TEST:" + SysUtil._FUNC_());
        MLConfig createDummyMLConfig = DummyContext.createDummyMLConfig();
        createDummyMLConfig.setRoleNum(new WorkerRole().name(), 2);
        createDummyMLConfig.setPythonFiles(new String[]{rootPath + "greeter.py"});
        createDummyMLConfig.setFuncName("map_func");
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        PythonFileUtil.registerPythonFiles(executionEnvironment, createDummyMLConfig);
        RoleUtils.addAMRole(executionEnvironment, createDummyMLConfig);
        RoleUtils.addRole(executionEnvironment, ExecutionMode.TRAIN, (DataStream) null, createDummyMLConfig, (TypeInformation) null, new WorkerRole());
        executionEnvironment.execute();
    }

    @Test
    public void outputJob() throws Exception {
        LOG.info("RUN TEST:" + SysUtil._FUNC_());
        MLConfig createDummyMLConfig = DummyContext.createDummyMLConfig();
        createDummyMLConfig.setRoleNum(new WorkerRole().name(), 3);
        createDummyMLConfig.setPythonFiles(new String[]{rootPath + "output_json.py"});
        createDummyMLConfig.setFuncName("map_func");
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        PythonFileUtil.registerPythonFiles(executionEnvironment, createDummyMLConfig);
        RoleUtils.addAMRole(executionEnvironment, createDummyMLConfig);
        RoleUtils.addRole(executionEnvironment, ExecutionMode.TRAIN, (DataStream) null, createDummyMLConfig, TypeInformation.of(JSONObject.class), new WorkerRole()).addSink(new DebugJsonSink()).setParallelism(3);
        executionEnvironment.execute();
    }

    @Test
    public void inputOutputJob() throws Exception {
        LOG.info("RUN TEST:" + SysUtil._FUNC_());
        MLConfig createDummyMLConfig = DummyContext.createDummyMLConfig();
        createDummyMLConfig.setRoleNum(new WorkerRole().name(), 3);
        createDummyMLConfig.setPythonFiles(new String[]{rootPath + "input_output_json.py"});
        createDummyMLConfig.setFuncName("map_func");
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        PythonFileUtil.registerPythonFiles(executionEnvironment, createDummyMLConfig);
        DataStreamSource parallelism = executionEnvironment.addSource(new DebugJsonSource()).setParallelism(3);
        RoleUtils.addAMRole(executionEnvironment, createDummyMLConfig);
        RoleUtils.addRole(executionEnvironment, ExecutionMode.TRAIN, parallelism, createDummyMLConfig, TypeInformation.of(JSONObject.class), new WorkerRole()).addSink(new DebugJsonSink()).setParallelism(3);
        executionEnvironment.execute();
    }

    @Test
    public void greeterJobTable() throws Exception {
        LOG.info("RUN TEST:" + SysUtil._FUNC_());
        MLConfig createDummyMLConfig = DummyContext.createDummyMLConfig();
        createDummyMLConfig.setRoleNum(new WorkerRole().name(), 2);
        createDummyMLConfig.setPythonFiles(new String[]{rootPath + "greeter.py"});
        createDummyMLConfig.setFuncName("map_func");
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment create = StreamTableEnvironment.create(executionEnvironment);
        StatementSet createStatementSet = create.createStatementSet();
        PythonFileUtil.registerPythonFiles(executionEnvironment, createDummyMLConfig);
        RoleUtils.addAMRole(create, createStatementSet, createDummyMLConfig);
        RoleUtils.addRole(create, createStatementSet, ExecutionMode.TRAIN, (Table) null, createDummyMLConfig, (Schema) null, new WorkerRole());
        TableTestUtil.execTableJobCustom(createDummyMLConfig, executionEnvironment, create, createStatementSet);
    }

    @Test
    public void outputJobTable() throws Exception {
        LOG.info("RUN TEST:" + SysUtil._FUNC_());
        MLConfig createDummyMLConfig = DummyContext.createDummyMLConfig();
        createDummyMLConfig.setRoleNum(new WorkerRole().name(), 3);
        createDummyMLConfig.setPythonFiles(new String[]{rootPath + "output_row.py"});
        createDummyMLConfig.setFuncName("map_func");
        createDummyMLConfig.getProperties().put("sys:encoding_class", RowCSVCoding.class.getCanonicalName());
        createDummyMLConfig.getProperties().put("sys:decoding_class", RowCSVCoding.class.getCanonicalName());
        StringBuilder sb = new StringBuilder();
        for (int i = 0; i < 4; i++) {
            sb.append(DataTypes.STRING.name()).append(",");
        }
        sb.deleteCharAt(sb.length() - 1);
        createDummyMLConfig.getProperties().put("output_types", sb.toString());
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment create = StreamTableEnvironment.create(executionEnvironment);
        StatementSet createStatementSet = create.createStatementSet();
        PythonFileUtil.registerPythonFiles(executionEnvironment, createDummyMLConfig);
        RoleUtils.addAMRole(create, createStatementSet, createDummyMLConfig);
        Schema build = Schema.newBuilder().fromFields(new String[]{"a", "b", "c", "d"}, new DataType[]{org.apache.flink.table.api.DataTypes.STRING(), org.apache.flink.table.api.DataTypes.STRING(), org.apache.flink.table.api.DataTypes.STRING(), org.apache.flink.table.api.DataTypes.STRING()}).build();
        create.createTemporaryTable("row_sink", TableDescriptor.forConnector("TableDebug").schema(build).build());
        createStatementSet.addInsert("row_sink", RoleUtils.addRole(create, createStatementSet, ExecutionMode.TRAIN, (Table) null, createDummyMLConfig, build, new WorkerRole()));
        TableTestUtil.execTableJobCustom(createDummyMLConfig, executionEnvironment, create, createStatementSet);
    }

    @Test
    public void inputOutputJobTable() throws Exception {
        LOG.info("RUN TEST:" + SysUtil._FUNC_());
        MLConfig createDummyMLConfig = DummyContext.createDummyMLConfig();
        createDummyMLConfig.setRoleNum(new WorkerRole().name(), 3);
        createDummyMLConfig.setPythonFiles(new String[]{rootPath + "input_output_row.py"});
        createDummyMLConfig.setFuncName("map_func");
        createDummyMLConfig.getProperties().put("sys:encoding_class", RowCSVCoding.class.getCanonicalName());
        createDummyMLConfig.getProperties().put("sys:decoding_class", RowCSVCoding.class.getCanonicalName());
        StringBuilder sb = new StringBuilder();
        sb.append(DataTypes.INT_32.name()).append(",");
        sb.append(DataTypes.INT_64.name()).append(",");
        sb.append(DataTypes.FLOAT_32.name()).append(",");
        sb.append(DataTypes.FLOAT_64.name()).append(",");
        sb.append(DataTypes.STRING.name());
        createDummyMLConfig.getProperties().put("input_types", sb.toString());
        createDummyMLConfig.getProperties().put("output_types", sb.toString());
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setParallelism(1);
        StreamTableEnvironment create = StreamTableEnvironment.create(executionEnvironment);
        StatementSet createStatementSet = create.createStatementSet();
        PythonFileUtil.registerPythonFiles(executionEnvironment, createDummyMLConfig);
        create.createTemporaryTable("debug_source", TableDescriptor.forConnector("TableDebug").schema(TypeUtil.rowTypeInfoToSchema(DebugRowSource.typeInfo)).build());
        Table from = create.from("debug_source");
        RoleUtils.addAMRole(create, createStatementSet, createDummyMLConfig);
        create.createTemporaryTable("debug_row_sink", TableDescriptor.forConnector("TableDebug").schema(TypeUtil.rowTypeInfoToSchema(DebugRowSource.typeInfo)).build());
        createStatementSet.addInsert("debug_row_sink", RoleUtils.addRole(create, createStatementSet, ExecutionMode.TRAIN, from, createDummyMLConfig, TypeUtil.rowTypeInfoToSchema(DebugRowSource.typeInfo), new WorkerRole()));
        TableTestUtil.execTableJobCustom(createDummyMLConfig, executionEnvironment, create, createStatementSet);
    }
}
