package org.flinkextended.flink.ml.operator.ops.inputformat;

import java.io.IOException;
import java.util.HashMap;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.util.Preconditions;
import org.flinkextended.flink.ml.cluster.ClusterConfig;
import org.flinkextended.flink.ml.cluster.ExecutionMode;
import org.flinkextended.flink.ml.cluster.node.MLContext;
import org.flinkextended.flink.ml.cluster.rpc.NodeServer;
import org.flinkextended.flink.ml.operator.ops.PythonEnvironmentManager;
import org.flinkextended.flink.ml.operator.ops.ResourcesUtils;
import org.flinkextended.flink.ml.operator.util.ColumnInfos;
import org.flinkextended.flink.ml.util.MLException;

/* loaded from: input_file:org/flinkextended/flink/ml/operator/ops/inputformat/NodeInputFormat.class */
public class NodeInputFormat<OUT> extends AbstractNodeInputFormat<OUT> {
    private final String nodeType;
    private final Configuration flinkConfig;

    public NodeInputFormat(String str, ClusterConfig clusterConfig) {
        this(str, clusterConfig, new Configuration());
    }

    public NodeInputFormat(String str, ClusterConfig clusterConfig, Configuration configuration) {
        super(clusterConfig);
        this.nodeType = str;
        this.flinkConfig = configuration;
    }

    public void configure(Configuration configuration) {
    }

    /* renamed from: createInputSplits, reason: merged with bridge method [inline-methods] */
    public NodeInputSplit[] m10createInputSplits(int i) throws IOException {
        Integer num = (Integer) this.clusterConfig.getNodeTypeCntMap().get(this.nodeType);
        Preconditions.checkState(num.intValue() >= i, "The required minimum number of splits: %s is greater than the number of node: %s with type %s", new Object[]{Integer.valueOf(i), num, this.nodeType});
        NodeInputSplit[] nodeInputSplitArr = new NodeInputSplit[num.intValue()];
        for (int i2 = 0; i2 < num.intValue(); i2++) {
            nodeInputSplitArr[i2] = new NodeInputSplit(num.intValue(), i2);
        }
        return nodeInputSplitArr;
    }

    @Override // org.flinkextended.flink.ml.operator.ops.inputformat.AbstractNodeInputFormat
    protected MLContext prepareMLContext(Integer num) throws MLException {
        PythonEnvironmentManager pythonEnvironmentManager = new PythonEnvironmentManager(this.clusterConfig, this.flinkConfig);
        try {
            pythonEnvironmentManager.open((StreamingRuntimeContext) getRuntimeContext());
            HashMap hashMap = new HashMap(this.clusterConfig.getProperties());
            hashMap.put("gpu_info", ResourcesUtils.parseGpuInfo(getRuntimeContext()));
            hashMap.putAll(pythonEnvironmentManager.getPythonEnvProperties());
            return new MLContext(ExecutionMode.OTHER, this.nodeType, num.intValue(), this.clusterConfig.getNodeTypeCntMap(), this.clusterConfig.getEntryFuncName(), hashMap, this.clusterConfig.getPythonVirtualEnvZipPath(), ColumnInfos.dummy().getNameToTypeMap());
        } catch (Exception e) {
            throw new MLException("Fail to open PythonEnvironmentManager", e);
        }
    }

    @Override // org.flinkextended.flink.ml.operator.ops.inputformat.AbstractNodeInputFormat
    protected Runnable getNodeServerRunnable(MLContext mLContext) {
        return new NodeServer(mLContext, this.nodeType);
    }

    public String getNodeType() {
        return this.nodeType;
    }
}
