package io.pravega.connectors.flink;

import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.function.Supplier;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.sources.BatchTableSource;
import org.apache.flink.table.sources.DefinedProctimeAttribute;
import org.apache.flink.table.sources.DefinedRowtimeAttributes;
import org.apache.flink.table.sources.RowtimeAttributeDescriptor;
import org.apache.flink.table.sources.StreamTableSource;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LogicalTypeRoot;
import org.apache.flink.table.types.utils.TypeConversions;
import org.apache.flink.table.utils.TableConnectorUtils;
import org.apache.flink.table.utils.TableSchemaUtils;
import org.apache.flink.types.Row;
import org.apache.flink.util.Preconditions;

/* loaded from: input_file:io/pravega/connectors/flink/FlinkPravegaTableSource.class */
public class FlinkPravegaTableSource implements StreamTableSource<Row>, BatchTableSource<Row>, DefinedProctimeAttribute, DefinedRowtimeAttributes {
    private final Supplier<FlinkPravegaReader<Row>> sourceFunctionFactory;
    private final Supplier<FlinkPravegaInputFormat<Row>> inputFormatFactory;
    private final TableSchema schema;
    private String proctimeAttribute;
    private List<RowtimeAttributeDescriptor> rowtimeAttributeDescriptors;

    /* JADX INFO: Access modifiers changed from: protected */
    public FlinkPravegaTableSource(Supplier<FlinkPravegaReader<Row>> supplier, Supplier<FlinkPravegaInputFormat<Row>> supplier2, TableSchema tableSchema) {
        this.sourceFunctionFactory = (Supplier) Preconditions.checkNotNull(supplier, "sourceFunctionFactory");
        this.inputFormatFactory = (Supplier) Preconditions.checkNotNull(supplier2, "inputFormatFactory");
        this.schema = TableSchemaUtils.checkNoGeneratedColumns(tableSchema);
    }

    public DataStream<Row> getDataStream(StreamExecutionEnvironment streamExecutionEnvironment) {
        FlinkPravegaReader<Row> flinkPravegaReader = this.sourceFunctionFactory.get();
        flinkPravegaReader.initialize();
        return streamExecutionEnvironment.addSource(flinkPravegaReader).name(explainSource());
    }

    public DataSet<Row> getDataSet(ExecutionEnvironment executionEnvironment) {
        return executionEnvironment.createInput(this.inputFormatFactory.get(), getProducedTypeInformation()).name(explainSource());
    }

    public DataType getProducedDataType() {
        return this.schema.toRowDataType();
    }

    private TypeInformation<Row> getProducedTypeInformation() {
        return TypeConversions.fromDataTypeToLegacyInfo(getProducedDataType());
    }

    public TableSchema getTableSchema() {
        return this.schema;
    }

    public String getProctimeAttribute() {
        return this.proctimeAttribute;
    }

    public List<RowtimeAttributeDescriptor> getRowtimeAttributeDescriptors() {
        return this.rowtimeAttributeDescriptors;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void setProctimeAttribute(String str) {
        if (str != null) {
            Optional fieldDataType = this.schema.getFieldDataType(str);
            if (!fieldDataType.isPresent()) {
                throw new ValidationException("Processing time attribute " + str + " is not present in TableSchema.");
            }
            if (((DataType) fieldDataType.get()).getLogicalType().getTypeRoot() != LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE) {
                throw new ValidationException("Processing time attribute " + str + " is not of type TIMESTAMP.");
            }
        }
        this.proctimeAttribute = str;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void setRowtimeAttributeDescriptors(List<RowtimeAttributeDescriptor> list) {
        Iterator<RowtimeAttributeDescriptor> it = list.iterator();
        while (it.hasNext()) {
            String attributeName = it.next().getAttributeName();
            Optional fieldDataType = this.schema.getFieldDataType(attributeName);
            if (!fieldDataType.isPresent()) {
                throw new ValidationException("Rowtime attribute " + attributeName + " is not present in TableSchema.");
            }
            if (((DataType) fieldDataType.get()).getLogicalType().getTypeRoot() != LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE) {
                throw new ValidationException("Rowtime attribute " + attributeName + " is not of type TIMESTAMP.");
            }
        }
        this.rowtimeAttributeDescriptors = list;
    }

    public String explainSource() {
        return TableConnectorUtils.generateRuntimeName(getClass(), this.schema.getFieldNames());
    }
}
