package org.flinkextended.flink.ml.operator.ops;

import com.google.common.annotations.VisibleForTesting;
import java.util.HashMap;
import java.util.Map;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.python.PythonConfig;
import org.apache.flink.python.env.PythonDependencyInfo;
import org.apache.flink.python.env.beam.ProcessPythonEnvironmentManager;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.flinkextended.flink.ml.cluster.ClusterConfig;
import org.flinkextended.flink.ml.operator.util.ReflectionUtils;

/* loaded from: input_file:org/flinkextended/flink/ml/operator/ops/PythonEnvironmentManager.class */
public class PythonEnvironmentManager {
    private final ClusterConfig clusterConfig;
    private final PythonConfig pythonConfig;
    private Map<String, String> pythonEnv;

    public PythonEnvironmentManager(ClusterConfig clusterConfig, Configuration configuration) {
        this.clusterConfig = clusterConfig;
        this.pythonConfig = new PythonConfig(configuration);
    }

    public void open(StreamingRuntimeContext streamingRuntimeContext) throws Exception {
        ProcessPythonEnvironmentManager processPythonEnvironmentManager = new ProcessPythonEnvironmentManager(PythonDependencyInfo.create(this.pythonConfig, streamingRuntimeContext.getDistributedCache()), streamingRuntimeContext.getTaskManagerRuntimeInfo().getTmpDirectories(), new HashMap(System.getenv()), streamingRuntimeContext.getJobId());
        processPythonEnvironmentManager.open();
        this.pythonEnv = (Map) ReflectionUtils.callMethod(processPythonEnvironmentManager, ProcessPythonEnvironmentManager.class, "getPythonEnv");
    }

    @VisibleForTesting
    String getWorkingDirectory() {
        String str = this.pythonEnv.get("_PYTHON_WORKING_DIR");
        return str != null ? str : System.getProperty("user.dir");
    }

    @VisibleForTesting
    String getPythonPath() {
        String str = this.pythonEnv.get("PYTHONPATH");
        if (str == null) {
            str = "";
        }
        return String.join(":", (String) this.clusterConfig.getProperties().getOrDefault("ENV:PYTHONPATH", ""), str);
    }

    @VisibleForTesting
    String getPythonExec() {
        return this.pythonEnv.get("python");
    }

    public Map<String, String> getPythonEnvProperties() {
        HashMap hashMap = new HashMap();
        hashMap.put("current_work_dir", getWorkingDirectory());
        hashMap.put("ENV:PYTHONPATH", getPythonPath());
        hashMap.put("python.exec", getPythonExec());
        return hashMap;
    }
}
