package org.flinkextended.flink.ml.operator.client;

import com.google.common.base.Joiner;
import java.io.IOException;
import java.util.List;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.python.util.PythonConfigUtil;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.StatementSet;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.api.TableDescriptor;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl;
import org.apache.flink.table.api.internal.StatementSetImpl;
import org.apache.flink.table.api.internal.TableImpl;
import org.apache.flink.types.Row;
import org.flinkextended.flink.ml.cluster.ClusterConfig;
import org.flinkextended.flink.ml.operator.coding.RowCSVCoding;
import org.flinkextended.flink.ml.operator.ops.NodeOperator;
import org.flinkextended.flink.ml.operator.ops.source.NodeSource;
import org.flinkextended.flink.ml.operator.util.PythonFileUtil;
import org.flinkextended.flink.ml.operator.util.ReflectionUtils;
import org.flinkextended.flink.ml.operator.util.TypeUtil;

/* loaded from: input_file:org/flinkextended/flink/ml/operator/client/NodeUtils.class */
public class NodeUtils {
    public static final TypeInformation<Void> DUMMY_TI = TypeInformation.of(Void.class);

    public static void scheduleNodes(StatementSet statementSet, ClusterConfig clusterConfig, String str) {
        StreamTableEnvironmentImpl streamTableEnvironmentImpl = (StreamTableEnvironmentImpl) ReflectionUtils.getFieldValue(statementSet, StatementSetImpl.class, "tableEnvironment");
        StreamExecutionEnvironment execEnv = streamTableEnvironmentImpl.execEnv();
        statementSet.addInsert(TableDescriptor.forConnector("blackhole").build(), streamTableEnvironmentImpl.fromDataStream(scheduleNodes(execEnv, clusterConfig, DUMMY_TI, str, mergeConfiguration(execEnv, streamTableEnvironmentImpl.getConfig()))));
    }

    public static Table scheduleNodes(StreamTableEnvironment streamTableEnvironment, ClusterConfig clusterConfig, Schema schema, String str) {
        StreamExecutionEnvironment execEnv = ((StreamTableEnvironmentImpl) streamTableEnvironment).execEnv();
        return streamTableEnvironment.fromDataStream(scheduleNodes(execEnv, clusterConfig, TypeUtil.schemaToRowTypeInfo(schema.resolve(((StreamTableEnvironmentImpl) streamTableEnvironment).getCatalogManager().getSchemaResolver())), str, mergeConfiguration(execEnv, streamTableEnvironment.getConfig())));
    }

    public static void scheduleNodes(StatementSet statementSet, Table table, ClusterConfig clusterConfig, String str) {
        StreamTableEnvironmentImpl streamTableEnvironmentImpl = (StreamTableEnvironmentImpl) ReflectionUtils.getFieldValue(statementSet, StatementSetImpl.class, "tableEnvironment");
        StreamExecutionEnvironment execEnv = streamTableEnvironmentImpl.execEnv();
        statementSet.addInsert(TableDescriptor.forConnector("blackhole").build(), streamTableEnvironmentImpl.fromDataStream(scheduleNodes(execEnv, streamTableEnvironmentImpl.toDataStream(table), clusterConfig, DUMMY_TI, str, mergeConfiguration(execEnv, streamTableEnvironmentImpl.getConfig()))));
    }

    public static Table scheduleNodes(Table table, ClusterConfig clusterConfig, Schema schema, String str) {
        StreamTableEnvironmentImpl tableEnvironment = ((TableImpl) table).getTableEnvironment();
        StreamExecutionEnvironment execEnv = tableEnvironment.execEnv();
        return tableEnvironment.fromDataStream(scheduleNodes(execEnv, tableEnvironment.toDataStream(table), clusterConfig, TypeUtil.schemaToRowTypeInfo(schema.resolve(tableEnvironment.getCatalogManager().getSchemaResolver())), str, mergeConfiguration(execEnv, tableEnvironment.getConfig())));
    }

    public static void scheduleAMNode(StatementSet statementSet, ClusterConfig clusterConfig) {
        StreamTableEnvironmentImpl streamTableEnvironmentImpl = (StreamTableEnvironmentImpl) ReflectionUtils.getFieldValue(statementSet, StatementSetImpl.class, "tableEnvironment");
        statementSet.addInsert(TableDescriptor.forConnector("blackhole").build(), streamTableEnvironmentImpl.fromDataStream(streamTableEnvironmentImpl.execEnv().addSource(NodeSource.createAMNodeSource(clusterConfig)).setParallelism(1).name("am")));
    }

    @Internal
    public static <T> SingleOutputStreamOperator<T> scheduleNodes(StreamExecutionEnvironment streamExecutionEnvironment, ClusterConfig clusterConfig, TypeInformation<T> typeInformation, String str, Configuration configuration) {
        ClusterConfig.Builder builder = clusterConfig.toBuilder();
        registerFileToFlinkCache(streamExecutionEnvironment, clusterConfig.getPythonFilePathsList(), builder);
        ClusterConfig build = builder.build();
        return streamExecutionEnvironment.addSource(NodeSource.createNodeSource(str, build, typeInformation, configuration)).setParallelism(build.getNodeCount(str).intValue()).name(str);
    }

    @Internal
    public static <T> SingleOutputStreamOperator<T> scheduleNodes(StreamExecutionEnvironment streamExecutionEnvironment, DataStream<Row> dataStream, ClusterConfig clusterConfig, TypeInformation<T> typeInformation, String str, Configuration configuration) {
        ClusterConfig.Builder builder = clusterConfig.toBuilder();
        registerFileToFlinkCache(streamExecutionEnvironment, clusterConfig.getPythonFilePathsList(), builder);
        ClusterConfig build = builder.build();
        return dataStream.transform(str, typeInformation, new NodeOperator(str, build, configuration)).setParallelism(build.getNodeCount(str).intValue());
    }

    @Internal
    public static Configuration mergeConfiguration(StreamExecutionEnvironment streamExecutionEnvironment, TableConfig tableConfig) {
        return PythonConfigUtil.getMergedConfig(streamExecutionEnvironment, tableConfig);
    }

    private static void registerFileToFlinkCache(StreamExecutionEnvironment streamExecutionEnvironment, List<String> list, ClusterConfig.Builder<?> builder) {
        try {
            builder.setProperty("python_files", Joiner.on(RowCSVCoding.TYPES_SPLIT_CONFIG).join(PythonFileUtil.registerPythonLibFilesIfNotExist(streamExecutionEnvironment, (String[]) list.toArray(new String[0]))));
            builder.setProperty("use_distribute_cache", "true");
        } catch (IOException e) {
            throw new RuntimeException("Fail to register python files to Flink job", e);
        }
    }
}
