package io.pravega.connectors.flink.dynamic.table;

import io.pravega.connectors.flink.FlinkPravegaInputFormat;
import io.pravega.connectors.flink.FlinkPravegaReader;
import io.pravega.connectors.flink.PravegaConfig;
import io.pravega.connectors.flink.util.StreamWithBoundaries;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nullable;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.format.DecodingFormat;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.connector.source.InputFormatProvider;
import org.apache.flink.table.connector.source.ScanTableSource;
import org.apache.flink.table.connector.source.SourceFunctionProvider;
import org.apache.flink.table.connector.source.abilities.SupportsReadingMetadata;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.DataType;
import org.apache.flink.util.Preconditions;

/* loaded from: input_file:io/pravega/connectors/flink/dynamic/table/FlinkPravegaDynamicTableSource.class */
public class FlinkPravegaDynamicTableSource implements ScanTableSource, SupportsReadingMetadata {
    private static final String FORMAT_METADATA_PREFIX = "from_format.";
    protected DataType producedDataType;
    private final DataType physicalDataType;
    private List<String> metadataKeys;
    private final DecodingFormat<DeserializationSchema<RowData>> decodingFormat;

    @Nullable
    private final String readerGroupName;
    private final PravegaConfig pravegaConfig;
    private final List<StreamWithBoundaries> streams;
    private final long readerGroupRefreshTimeMillis;
    private final long checkpointInitiateTimeoutMillis;
    private final long eventReadTimeoutMillis;
    private final int maxOutstandingCheckpointRequest;

    @Nullable
    private final String uid;
    private final boolean isStreamingReader;
    private final boolean isBounded;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/pravega/connectors/flink/dynamic/table/FlinkPravegaDynamicTableSource$ReadableMetadata.class */
    public enum ReadableMetadata {
        EVENT_POINTER("event_pointer", DataTypes.BYTES().notNull());

        final String key;
        final DataType dataType;

        ReadableMetadata(String str, DataType dataType) {
            this.key = str;
            this.dataType = dataType;
        }
    }

    public FlinkPravegaDynamicTableSource(DataType dataType, DecodingFormat<DeserializationSchema<RowData>> decodingFormat, String str, PravegaConfig pravegaConfig, List<StreamWithBoundaries> list, long j, long j2, long j3, int i, String str2, boolean z, boolean z2) {
        this(dataType, dataType, Collections.emptyList(), decodingFormat, str, pravegaConfig, list, j, j2, j3, i, str2, z, z2);
    }

    FlinkPravegaDynamicTableSource(DataType dataType, DataType dataType2, List<String> list, DecodingFormat<DeserializationSchema<RowData>> decodingFormat, String str, PravegaConfig pravegaConfig, List<StreamWithBoundaries> list2, long j, long j2, long j3, int i, String str2, boolean z, boolean z2) {
        this.physicalDataType = (DataType) Preconditions.checkNotNull(dataType, "Physical data type must not be null.");
        this.producedDataType = (DataType) Preconditions.checkNotNull(dataType2, "Produced data type must not be null.");
        this.decodingFormat = (DecodingFormat) Preconditions.checkNotNull(decodingFormat, "Decoding format must not be null.");
        this.metadataKeys = (List) Preconditions.checkNotNull(list, "Metadata Keys must not be null.");
        this.readerGroupName = str;
        this.pravegaConfig = (PravegaConfig) Preconditions.checkNotNull(pravegaConfig, "Pravega config must not be null.");
        this.streams = (List) Preconditions.checkNotNull(list2, "Source streams must not be null.");
        this.readerGroupRefreshTimeMillis = j;
        this.checkpointInitiateTimeoutMillis = j2;
        this.eventReadTimeoutMillis = j3;
        this.maxOutstandingCheckpointRequest = i;
        this.uid = str2;
        this.isStreamingReader = z;
        this.isBounded = z2;
    }

    public ChangelogMode getChangelogMode() {
        return this.decodingFormat.getChangelogMode();
    }

    /* JADX WARN: Multi-variable type inference failed */
    public ScanTableSource.ScanRuntimeProvider getScanRuntimeProvider(ScanTableSource.ScanContext scanContext) {
        FlinkPravegaDynamicDeserializationSchema flinkPravegaDynamicDeserializationSchema = new FlinkPravegaDynamicDeserializationSchema(scanContext.createTypeInformation(this.producedDataType), this.producedDataType.getChildren().size() - this.metadataKeys.size(), this.metadataKeys, (DeserializationSchema) this.decodingFormat.createRuntimeDecoder(scanContext, this.physicalDataType));
        if (!this.isStreamingReader) {
            FlinkPravegaInputFormat.Builder withDeserializationSchema = FlinkPravegaInputFormat.builder().withPravegaConfig(this.pravegaConfig).withDeserializationSchema(flinkPravegaDynamicDeserializationSchema);
            for (StreamWithBoundaries streamWithBoundaries : this.streams) {
                withDeserializationSchema.forStream(streamWithBoundaries.getStream(), streamWithBoundaries.getFrom(), streamWithBoundaries.getTo());
            }
            return InputFormatProvider.of(withDeserializationSchema.build());
        }
        FlinkPravegaReader.Builder withMaxOutstandingCheckpointRequest = ((FlinkPravegaReader.Builder) FlinkPravegaReader.builder().withPravegaConfig(this.pravegaConfig)).withDeserializationSchema(flinkPravegaDynamicDeserializationSchema).withReaderGroupRefreshTime(Time.milliseconds(this.readerGroupRefreshTimeMillis)).withCheckpointInitiateTimeout(Time.milliseconds(this.checkpointInitiateTimeoutMillis)).withEventReadTimeout(Time.milliseconds(this.eventReadTimeoutMillis)).withMaxOutstandingCheckpointRequest(this.maxOutstandingCheckpointRequest);
        Optional ofNullable = Optional.ofNullable(this.readerGroupName);
        Objects.requireNonNull(withMaxOutstandingCheckpointRequest);
        ofNullable.ifPresent(withMaxOutstandingCheckpointRequest::withReaderGroupName);
        for (StreamWithBoundaries streamWithBoundaries2 : this.streams) {
            withMaxOutstandingCheckpointRequest.forStream(streamWithBoundaries2.getStream(), streamWithBoundaries2.getFrom(), streamWithBoundaries2.getTo());
        }
        withMaxOutstandingCheckpointRequest.uid(this.uid == null ? withMaxOutstandingCheckpointRequest.generateUid() : this.uid);
        return SourceFunctionProvider.of(withMaxOutstandingCheckpointRequest.build(), this.isBounded);
    }

    public DynamicTableSource copy() {
        return new FlinkPravegaDynamicTableSource(this.physicalDataType, this.producedDataType, this.metadataKeys, this.decodingFormat, this.readerGroupName, this.pravegaConfig, this.streams, this.readerGroupRefreshTimeMillis, this.checkpointInitiateTimeoutMillis, this.eventReadTimeoutMillis, this.maxOutstandingCheckpointRequest, this.uid, this.isStreamingReader, this.isBounded);
    }

    public String asSummaryString() {
        return "Pravega";
    }

    public boolean equals(Object obj) {
        if (this == obj) {
            return true;
        }
        if (obj == null || getClass() != obj.getClass()) {
            return false;
        }
        FlinkPravegaDynamicTableSource flinkPravegaDynamicTableSource = (FlinkPravegaDynamicTableSource) obj;
        return this.readerGroupRefreshTimeMillis == flinkPravegaDynamicTableSource.readerGroupRefreshTimeMillis && this.checkpointInitiateTimeoutMillis == flinkPravegaDynamicTableSource.checkpointInitiateTimeoutMillis && this.eventReadTimeoutMillis == flinkPravegaDynamicTableSource.eventReadTimeoutMillis && this.maxOutstandingCheckpointRequest == flinkPravegaDynamicTableSource.maxOutstandingCheckpointRequest && this.isStreamingReader == flinkPravegaDynamicTableSource.isStreamingReader && this.isBounded == flinkPravegaDynamicTableSource.isBounded && this.producedDataType.equals(flinkPravegaDynamicTableSource.producedDataType) && this.physicalDataType.equals(flinkPravegaDynamicTableSource.physicalDataType) && this.decodingFormat.equals(flinkPravegaDynamicTableSource.decodingFormat) && this.metadataKeys.equals(flinkPravegaDynamicTableSource.metadataKeys) && Objects.equals(this.readerGroupName, flinkPravegaDynamicTableSource.readerGroupName) && this.pravegaConfig.equals(flinkPravegaDynamicTableSource.pravegaConfig) && this.streams.equals(flinkPravegaDynamicTableSource.streams) && Objects.equals(this.uid, flinkPravegaDynamicTableSource.uid);
    }

    public int hashCode() {
        return Objects.hash(this.producedDataType, this.physicalDataType, this.decodingFormat, this.metadataKeys, this.readerGroupName, this.pravegaConfig, this.streams, Long.valueOf(this.readerGroupRefreshTimeMillis), Long.valueOf(this.checkpointInitiateTimeoutMillis), Long.valueOf(this.eventReadTimeoutMillis), Integer.valueOf(this.maxOutstandingCheckpointRequest), this.uid, Boolean.valueOf(this.isStreamingReader), Boolean.valueOf(this.isBounded));
    }

    public Map<String, DataType> listReadableMetadata() {
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        this.decodingFormat.listReadableMetadata().forEach((str, dataType) -> {
            linkedHashMap.put(FORMAT_METADATA_PREFIX + str, dataType);
        });
        Stream.of((Object[]) ReadableMetadata.values()).forEachOrdered(readableMetadata -> {
            linkedHashMap.put(readableMetadata.key, readableMetadata.dataType);
        });
        return linkedHashMap;
    }

    public void applyReadableMetadata(List<String> list, DataType dataType) {
        Map map = (Map) list.stream().collect(Collectors.partitioningBy(str -> {
            return str.startsWith(FORMAT_METADATA_PREFIX);
        }));
        if (this.decodingFormat.listReadableMetadata().size() > 0) {
            this.decodingFormat.applyReadableMetadata((List) ((List) map.get(true)).stream().map(str2 -> {
                return str2.substring(FORMAT_METADATA_PREFIX.length());
            }).collect(Collectors.toList()));
        }
        this.metadataKeys = (List) map.get(false);
        this.producedDataType = dataType;
    }
}
