package org.flinkextended.flink.ml.operator.ops.table;

import java.io.Serializable;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.sources.StreamTableSource;
import org.apache.flink.types.Row;
import org.flinkextended.flink.ml.cluster.ExecutionMode;
import org.flinkextended.flink.ml.cluster.MLConfig;
import org.flinkextended.flink.ml.cluster.role.BaseRole;
import org.flinkextended.flink.ml.operator.ops.source.NodeSource;
import org.flinkextended.flink.ml.operator.util.TypeUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/flinkextended/flink/ml/operator/ops/table/MLTableSource.class */
public class MLTableSource implements StreamTableSource<Row>, Serializable {
    private final MLConfig config;
    private final ExecutionMode mode;
    private final BaseRole role;
    private final RowTypeInfo rowType;
    private final int parallelism;
    private static Logger LOG = LoggerFactory.getLogger(MLTableSource.class);

    public MLTableSource(ExecutionMode executionMode, BaseRole baseRole, MLConfig mLConfig, TableSchema tableSchema, int i) {
        this.mode = executionMode;
        this.config = mLConfig;
        this.role = baseRole;
        this.rowType = TypeUtil.schemaToRowTypeInfo(tableSchema);
        this.parallelism = i;
    }

    public MLTableSource(ExecutionMode executionMode, BaseRole baseRole, MLConfig mLConfig, TableSchema tableSchema) {
        this(executionMode, baseRole, mLConfig, tableSchema, -1);
    }

    private MLTableSource(ExecutionMode executionMode, BaseRole baseRole, MLConfig mLConfig, RowTypeInfo rowTypeInfo, int i) {
        this.mode = executionMode;
        this.config = mLConfig;
        this.role = baseRole;
        this.rowType = rowTypeInfo;
        this.parallelism = i;
    }

    public TypeInformation<Row> getReturnType() {
        return this.rowType;
    }

    public TableSchema getTableSchema() {
        return TypeUtil.rowTypeInfoToTableSchema(this.rowType);
    }

    public String explainSource() {
        return (String) this.config.getProperties().getOrDefault("flink.vertex.name", this.role.name());
    }

    public DataStream<Row> getDataStream(StreamExecutionEnvironment streamExecutionEnvironment) {
        DataStreamSource addSource = streamExecutionEnvironment.addSource(NodeSource.createSource(this.mode, this.role, this.config, this.rowType));
        if (this.parallelism > 0) {
            addSource = addSource.setParallelism(this.parallelism);
        }
        return addSource.name(explainSource());
    }
}
