package org.flinkextended.flink.ml.cluster.node.runner.python;

import com.alibaba.fastjson.JSONObject;
import java.nio.file.Paths;
import org.flinkextended.flink.ml.TestWithNodeService;
import org.flinkextended.flink.ml.cluster.node.MLContext;
import org.flinkextended.flink.ml.cluster.node.runner.python.log.Slf4JProcessOutputConsumer;
import org.flinkextended.flink.ml.coding.impl.ByteArrayCodingImpl;
import org.flinkextended.flink.ml.data.DataExchange;
import org.flinkextended.flink.ml.util.DummyContext;
import org.flinkextended.flink.ml.util.TestUtil;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/flinkextended/flink/ml/cluster/node/runner/python/ProcessPythonRunnerTest.class */
public class ProcessPythonRunnerTest extends TestWithNodeService {
    private static final Logger Logger = LoggerFactory.getLogger(ProcessPythonRunnerTest.class);

    public MLContext createMLContext(String str) throws Exception {
        String str2 = TestUtil.getProjectRootPath() + "/dl-on-flink-framework/src/test/python";
        MLContext createDummyMLContext = DummyContext.createDummyMLContext();
        createDummyMLContext.setPythonDir(Paths.get(str2, new String[0]));
        createDummyMLContext.setPythonFiles(new String[]{str});
        createDummyMLContext.setFuncName("map_func");
        configureContext(createDummyMLContext);
        return createDummyMLContext;
    }

    @Test
    public void greeterPythonTest() throws Exception {
        new ProcessPythonRunner(createMLContext("greeter.py")).runScript();
    }

    @Test
    public void greeterPythonTestToSlf4j() throws Exception {
        MLContext createMLContext = createMLContext("greeter.py");
        createMLContext.getProperties().put("sys:python_process_logger_consumer_class", Slf4JProcessOutputConsumer.class.getCanonicalName());
        new ProcessPythonRunner(createMLContext).runScript();
    }

    @Test
    public void pythonReadFromJavaTest() throws Exception {
        MLContext createMLContext = createMLContext("read_from_java.py");
        DataExchange dataExchange = new DataExchange(createMLContext);
        JSONObject jSONObject = new JSONObject();
        jSONObject.put("a", "a");
        dataExchange.write(jSONObject);
        new ProcessPythonRunner(createMLContext).runScript();
    }

    @Test
    public void pythonWriteToJavaTest() throws Exception {
        MLContext createMLContext = createMLContext("write_to_java.py");
        DataExchange dataExchange = new DataExchange(createMLContext);
        new ProcessPythonRunner(createMLContext).runScript();
        System.out.println("res:" + ((JSONObject) dataExchange.read(true)).toJSONString());
    }

    @Test
    public void pythonReadBytesFromJavaTest() throws Exception {
        MLContext createMLContext = createMLContext("read_bytes_from_java.py");
        createMLContext.getProperties().put("sys:decoding_class", ByteArrayCodingImpl.class.getCanonicalName());
        createMLContext.getProperties().put("sys:encoding_class", ByteArrayCodingImpl.class.getCanonicalName());
        new DataExchange(createMLContext).write("aaaaa".getBytes());
        new ProcessPythonRunner(createMLContext).runScript();
    }

    @Test
    public void pythonWriteBytesToJavaTest() throws Exception {
        MLContext createMLContext = createMLContext("write_bytes_to_java.py");
        createMLContext.getProperties().put("sys:decoding_class", ByteArrayCodingImpl.class.getCanonicalName());
        createMLContext.getProperties().put("sys:encoding_class", ByteArrayCodingImpl.class.getCanonicalName());
        DataExchange dataExchange = new DataExchange(createMLContext);
        new ProcessPythonRunner(createMLContext).runScript();
        Logger.info("res:" + new String((byte[]) dataExchange.read(true)));
    }

    @Test
    public void pythonReadJsonFromJavaTest() throws Exception {
        MLContext createMLContext = createMLContext("read_json_from_java.py");
        DataExchange dataExchange = new DataExchange(createMLContext);
        JSONObject jSONObject = new JSONObject();
        jSONObject.put("json_read", "json_read");
        dataExchange.write(jSONObject);
        new ProcessPythonRunner(createMLContext).runScript();
    }

    @Test
    public void pythonWriteJsonToJavaTest() throws Exception {
        MLContext createMLContext = createMLContext("write_json_to_java.py");
        DataExchange dataExchange = new DataExchange(createMLContext);
        new ProcessPythonRunner(createMLContext).runScript();
        Logger.info("res:" + ((JSONObject) dataExchange.read(true)).toJSONString());
    }
}
