package org.flinkextended.flink.ml.operator.client;

import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.StatementSet;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableDescriptor;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl;
import org.apache.flink.table.api.internal.TableEnvironmentInternal;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.types.Row;
import org.flinkextended.flink.ml.cluster.ExecutionMode;
import org.flinkextended.flink.ml.cluster.MLConfig;
import org.flinkextended.flink.ml.cluster.role.AMRole;
import org.flinkextended.flink.ml.cluster.role.BaseRole;
import org.flinkextended.flink.ml.operator.ops.MLFlatMapOp;
import org.flinkextended.flink.ml.operator.ops.sink.DummySink;
import org.flinkextended.flink.ml.operator.ops.source.NodeSource;
import org.flinkextended.flink.ml.operator.util.TypeUtil;

/* loaded from: input_file:org/flinkextended/flink/ml/operator/client/RoleUtils.class */
public class RoleUtils {
    private static final TypeInformation<String> DUMMY_TI = getTypeInfo(String.class);
    static final Schema DUMMY_SCHEMA = Schema.newBuilder().column("a", DataTypes.STRING()).build();

    public static <IN, OUT> DataStream<OUT> addRole(StreamExecutionEnvironment streamExecutionEnvironment, ExecutionMode executionMode, DataStream<IN> dataStream, MLConfig mLConfig, TypeInformation<OUT> typeInformation, BaseRole baseRole) {
        if (null != dataStream) {
            mLConfig.addProperty("job_has_input", "true");
        }
        TypeInformation typeInformation2 = typeInformation == null ? DUMMY_TI : typeInformation;
        int intValue = ((Integer) mLConfig.getRoleParallelismMap().get(baseRole.name())).intValue();
        SingleOutputStreamOperator name = dataStream == null ? streamExecutionEnvironment.addSource(NodeSource.createSource(executionMode, baseRole, mLConfig, typeInformation2)).setParallelism(intValue).name(baseRole.name()) : dataStream.flatMap(new MLFlatMapOp(executionMode, baseRole, mLConfig, dataStream.getType(), typeInformation2)).setParallelism(intValue).name(baseRole.name());
        if (typeInformation == null && name != null) {
            name.addSink(new DummySink()).setParallelism(intValue).name("Dummy sink");
        }
        return name;
    }

    public static void addAMRole(StreamExecutionEnvironment streamExecutionEnvironment, MLConfig mLConfig) {
        streamExecutionEnvironment.addSource(NodeSource.createSource(ExecutionMode.OTHER, new AMRole(), mLConfig, DUMMY_TI)).setParallelism(1).name(new AMRole().name()).addSink(new DummySink()).setParallelism(1);
    }

    public static Table addRole(TableEnvironment tableEnvironment, StatementSet statementSet, ExecutionMode executionMode, Table table, MLConfig mLConfig, Schema schema, BaseRole baseRole) {
        Table dsToTable;
        if (null != table) {
            mLConfig.addProperty("job_has_input", "true");
        }
        ResolvedSchema resolve = (schema != null ? schema : DUMMY_SCHEMA).resolve(((TableEnvironmentInternal) tableEnvironment).getCatalogManager().getSchemaResolver());
        int intValue = ((Integer) mLConfig.getRoleParallelismMap().get(baseRole.name())).intValue();
        if (table == null) {
            dsToTable = dsToTable(((StreamTableEnvironmentImpl) tableEnvironment).execEnv().addSource(NodeSource.createSource(executionMode, baseRole, mLConfig, TypeUtil.schemaToRowTypeInfo(resolve))).setParallelism(intValue).name(baseRole.name()), tableEnvironment);
        } else {
            DataStream<Row> tableToDS = tableToDS(table, tableEnvironment);
            dsToTable = dsToTable(tableToDS.flatMap(new MLFlatMapOp(executionMode, baseRole, mLConfig, tableToDS.getType(), TypeUtil.schemaToRowTypeInfo(resolve))).setParallelism(intValue).name(baseRole.name()), tableEnvironment);
        }
        if (schema == null && dsToTable != null) {
            tableEnvironment.createTemporaryTable(baseRole.name() + "_table_sink", TableDescriptor.forConnector("DummyTable").schema(DUMMY_SCHEMA).build());
            statementSet.addInsert(baseRole.name() + "_table_sink", dsToTable);
        }
        return dsToTable;
    }

    public static void addAMRole(TableEnvironment tableEnvironment, StatementSet statementSet, MLConfig mLConfig) {
        Table dsToTable = dsToTable(((StreamTableEnvironmentImpl) tableEnvironment).execEnv().addSource(NodeSource.createSource(ExecutionMode.OTHER, new AMRole(), mLConfig, TypeUtil.schemaToRowTypeInfo(DUMMY_SCHEMA.resolve(((TableEnvironmentInternal) tableEnvironment).getCatalogManager().getSchemaResolver())))).name(new AMRole().name()).setParallelism(1), tableEnvironment);
        tableEnvironment.createTemporaryTable("am_table_stream_sink", TableDescriptor.forConnector("DummyTable").schema(DUMMY_SCHEMA).build());
        statementSet.addInsert("am_table_stream_sink", dsToTable);
    }

    private static <OUT> TypeInformation<OUT> getTypeInfo(Class<OUT> cls) {
        if (cls == null) {
            return null;
        }
        return TypeInformation.of(cls);
    }

    public static Table dsToTable(DataStream<Row> dataStream, TableEnvironment tableEnvironment) {
        return ((StreamTableEnvironment) tableEnvironment).fromDataStream(dataStream);
    }

    public static DataStream<Row> tableToDS(Table table, TableEnvironment tableEnvironment) {
        if (table == null) {
            return null;
        }
        return ((StreamTableEnvironment) tableEnvironment).toAppendStream(table, TypeUtil.schemaToRowTypeInfo(table.getSchema()));
    }
}
