package io.pravega.connectors.flink;

import io.pravega.client.stream.StreamCut;
import io.pravega.connectors.flink.table.descriptors.Pravega;
import io.pravega.connectors.flink.table.descriptors.PravegaValidator;
import io.pravega.connectors.flink.util.ConnectorConfigurations;
import io.pravega.connectors.flink.util.StreamWithBoundaries;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.Function;
import java.util.function.Supplier;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.descriptors.DescriptorProperties;
import org.apache.flink.table.descriptors.SchemaValidator;
import org.apache.flink.table.factories.DeserializationSchemaFactory;
import org.apache.flink.table.factories.SerializationSchemaFactory;
import org.apache.flink.table.factories.TableFactoryService;
import org.apache.flink.table.utils.TableSchemaUtils;
import org.apache.flink.types.Row;

/* loaded from: input_file:io/pravega/connectors/flink/FlinkPravegaTableFactoryBase.class */
public abstract class FlinkPravegaTableFactoryBase {
    /* JADX INFO: Access modifiers changed from: protected */
    public Map<String, String> getRequiredContext() {
        HashMap hashMap = new HashMap();
        hashMap.put("connector.type", Pravega.CONNECTOR_TYPE_VALUE_PRAVEGA);
        hashMap.put("connector.property-version", "1");
        hashMap.put("connector.version", getVersion());
        return hashMap;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public List<String> getSupportedProperties() {
        ArrayList arrayList = new ArrayList();
        arrayList.add(Pravega.CONNECTOR_METRICS);
        arrayList.add(Pravega.CONNECTOR_CONNECTION_CONFIG);
        arrayList.add(Pravega.CONNECTOR_CONNECTION_CONFIG_CONTROLLER_URI);
        arrayList.add(Pravega.CONNECTOR_CONNECTION_CONFIG_DEFAULT_SCOPE);
        arrayList.add(Pravega.CONNECTOR_CONNECTION_CONFIG_SECURITY);
        arrayList.add(Pravega.CONNECTOR_CONNECTION_CONFIG_SECURITY_AUTH_TYPE);
        arrayList.add(Pravega.CONNECTOR_CONNECTION_CONFIG_SECURITY_AUTH_TOKEN);
        arrayList.add(Pravega.CONNECTOR_CONNECTION_CONFIG_SECURITY_VALIDATE_HOSTNAME);
        arrayList.add(Pravega.CONNECTOR_CONNECTION_CONFIG_SECURITY_TRUST_STORE);
        arrayList.add(Pravega.CONNECTOR_READER);
        arrayList.add(Pravega.CONNECTOR_READER_STREAM_INFO);
        arrayList.add("connector.reader.stream-info.#.scope");
        arrayList.add("connector.reader.stream-info.#.stream");
        arrayList.add("connector.reader.stream-info.#.start-streamcut");
        arrayList.add("connector.reader.stream-info.#.end-streamcut");
        arrayList.add(Pravega.CONNECTOR_READER_READER_GROUP);
        arrayList.add(Pravega.CONNECTOR_READER_READER_GROUP_UID);
        arrayList.add(Pravega.CONNECTOR_READER_READER_GROUP_SCOPE);
        arrayList.add(Pravega.CONNECTOR_READER_READER_GROUP_NAME);
        arrayList.add(Pravega.CONNECTOR_READER_READER_GROUP_REFRESH_INTERVAL);
        arrayList.add(Pravega.CONNECTOR_READER_READER_GROUP_EVENT_READ_TIMEOUT_INTERVAL);
        arrayList.add(Pravega.CONNECTOR_READER_READER_GROUP_CHECKPOINT_INITIATE_TIMEOUT_INTERVAL);
        arrayList.add(Pravega.CONNECTOR_READER_USER_TIMESTAMP_ASSIGNER);
        arrayList.add(Pravega.CONNECTOR_WRITER);
        arrayList.add(Pravega.CONNECTOR_WRITER_SCOPE);
        arrayList.add(Pravega.CONNECTOR_WRITER_STREAM);
        arrayList.add(Pravega.CONNECTOR_WRITER_MODE);
        arrayList.add(Pravega.CONNECTOR_WRITER_TXN_LEASE_RENEWAL_INTERVAL);
        arrayList.add(Pravega.CONNECTOR_WRITER_ENABLE_WATERMARK);
        arrayList.add(Pravega.CONNECTOR_WRITER_ROUTING_KEY_FILED_NAME);
        arrayList.add("schema.#.type");
        arrayList.add("schema.#.data-type");
        arrayList.add("schema.#.name");
        arrayList.add("schema.#.from");
        arrayList.add("schema.#.expr");
        arrayList.add("schema.#.proctime");
        arrayList.add("schema.#.rowtime.timestamps.type");
        arrayList.add("schema.#.rowtime.timestamps.from");
        arrayList.add("schema.#.rowtime.timestamps.class");
        arrayList.add("schema.#.rowtime.timestamps.serialized");
        arrayList.add("schema.#.rowtime.watermarks.type");
        arrayList.add("schema.#.rowtime.watermarks.class");
        arrayList.add("schema.#.rowtime.watermarks.serialized");
        arrayList.add("schema.#.rowtime.watermarks.delay");
        arrayList.add("schema.watermark.#.rowtime");
        arrayList.add("schema.watermark.#.strategy.expr");
        arrayList.add("schema.watermark.#.strategy.data-type");
        arrayList.add("format.*");
        return arrayList;
    }

    protected abstract String getVersion();

    protected abstract boolean isStreamEnvironment();

    protected DescriptorProperties getValidatedProperties(Map<String, String> map) {
        DescriptorProperties descriptorProperties = new DescriptorProperties(true);
        descriptorProperties.putProperties(map);
        new SchemaValidator(isStreamEnvironment(), true, true).validate(descriptorProperties);
        new PravegaValidator().validate(descriptorProperties);
        return descriptorProperties;
    }

    protected SerializationSchema<Row> getSerializationSchema(Map<String, String> map) {
        return TableFactoryService.find(SerializationSchemaFactory.class, map, getClass().getClassLoader()).createSerializationSchema(map);
    }

    protected DeserializationSchema<Row> getDeserializationSchema(Map<String, String> map) {
        return TableFactoryService.find(DeserializationSchemaFactory.class, map, getClass().getClassLoader()).createDeserializationSchema(map);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public FlinkPravegaTableSource createFlinkPravegaTableSource(Map<String, String> map) {
        DescriptorProperties validatedProperties = getValidatedProperties(map);
        TableSchema physicalSchema = TableSchemaUtils.getPhysicalSchema(validatedProperties.getTableSchema("schema"));
        DeserializationSchema<Row> deserializationSchema = getDeserializationSchema(map);
        ConnectorConfigurations connectorConfigurations = new ConnectorConfigurations();
        connectorConfigurations.parseConfigurations(validatedProperties, ConnectorConfigurations.ConfigurationType.READER);
        Pravega.TableSourceReaderBuilder tableSourceReaderBuilder = new Pravega().tableSourceReaderBuilder();
        tableSourceReaderBuilder.withDeserializationSchema(deserializationSchema);
        if (connectorConfigurations.getAssignerWithTimeWindows().isPresent()) {
            tableSourceReaderBuilder.withTimestampAssigner(connectorConfigurations.getAssignerWithTimeWindows().get());
        }
        if (connectorConfigurations.getUid().isPresent()) {
            tableSourceReaderBuilder.uid(connectorConfigurations.getUid().get());
        }
        if (connectorConfigurations.getRgScope().isPresent()) {
            tableSourceReaderBuilder.withReaderGroupScope(connectorConfigurations.getRgScope().get());
        }
        if (connectorConfigurations.getRgName().isPresent()) {
            tableSourceReaderBuilder.withReaderGroupName(connectorConfigurations.getRgName().get());
        }
        if (connectorConfigurations.getRefreshInterval().isPresent()) {
            tableSourceReaderBuilder.withReaderGroupRefreshTime(Time.milliseconds(connectorConfigurations.getRefreshInterval().get().longValue()));
        }
        if (connectorConfigurations.getEventReadTimeoutInterval().isPresent()) {
            tableSourceReaderBuilder.withEventReadTimeout(Time.milliseconds(connectorConfigurations.getEventReadTimeoutInterval().get().longValue()));
        }
        if (connectorConfigurations.getCheckpointInitiateTimeoutInterval().isPresent()) {
            tableSourceReaderBuilder.withCheckpointInitiateTimeout(Time.milliseconds(connectorConfigurations.getCheckpointInitiateTimeoutInterval().get().longValue()));
        }
        tableSourceReaderBuilder.withPravegaConfig(connectorConfigurations.getPravegaConfig());
        if (connectorConfigurations.getMetrics().isPresent()) {
            tableSourceReaderBuilder.enableMetrics(connectorConfigurations.getMetrics().get().booleanValue());
        }
        tableSourceReaderBuilder.withPravegaConfig(connectorConfigurations.getPravegaConfig());
        for (StreamWithBoundaries streamWithBoundaries : connectorConfigurations.getReaderStreams()) {
            if (streamWithBoundaries.getFrom() != StreamCut.UNBOUNDED && streamWithBoundaries.getTo() != StreamCut.UNBOUNDED) {
                tableSourceReaderBuilder.forStream(streamWithBoundaries.getStream(), streamWithBoundaries.getFrom(), streamWithBoundaries.getTo());
            } else if (streamWithBoundaries.getFrom() != StreamCut.UNBOUNDED) {
                tableSourceReaderBuilder.forStream(streamWithBoundaries.getStream(), streamWithBoundaries.getFrom());
            } else {
                tableSourceReaderBuilder.forStream(streamWithBoundaries.getStream());
            }
        }
        tableSourceReaderBuilder.getClass();
        Supplier supplier = tableSourceReaderBuilder::buildSourceFunction;
        tableSourceReaderBuilder.getClass();
        FlinkPravegaTableSource flinkPravegaTableSource = new FlinkPravegaTableSource(supplier, tableSourceReaderBuilder::buildInputFormat, physicalSchema);
        flinkPravegaTableSource.setRowtimeAttributeDescriptors(SchemaValidator.deriveRowtimeAttributes(validatedProperties));
        Optional deriveProctimeAttribute = SchemaValidator.deriveProctimeAttribute(validatedProperties);
        if (deriveProctimeAttribute.isPresent()) {
            flinkPravegaTableSource.setProctimeAttribute((String) deriveProctimeAttribute.get());
        }
        return flinkPravegaTableSource;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public FlinkPravegaTableSink createFlinkPravegaTableSink(Map<String, String> map) {
        DescriptorProperties validatedProperties = getValidatedProperties(map);
        TableSchema physicalSchema = TableSchemaUtils.getPhysicalSchema(validatedProperties.getTableSchema("schema"));
        SerializationSchema<Row> serializationSchema = getSerializationSchema(map);
        ConnectorConfigurations connectorConfigurations = new ConnectorConfigurations();
        connectorConfigurations.parseConfigurations(validatedProperties, ConnectorConfigurations.ConfigurationType.WRITER);
        Pravega.TableSinkWriterBuilder tableSinkWriterBuilder = new Pravega().tableSinkWriterBuilder();
        if (connectorConfigurations.getTxnLeaseRenewalInterval().isPresent()) {
            tableSinkWriterBuilder.withTxnLeaseRenewalPeriod(Time.milliseconds(connectorConfigurations.getTxnLeaseRenewalInterval().get().longValue()));
        }
        if (connectorConfigurations.getWriterMode().isPresent()) {
            tableSinkWriterBuilder.withWriterMode(connectorConfigurations.getWriterMode().get());
        }
        if (connectorConfigurations.getMetrics().isPresent()) {
            tableSinkWriterBuilder.enableMetrics(connectorConfigurations.getMetrics().get().booleanValue());
        }
        if (connectorConfigurations.getWatermark().isPresent()) {
            tableSinkWriterBuilder.enableWatermark(connectorConfigurations.getWatermark().get().booleanValue());
        }
        tableSinkWriterBuilder.withPravegaConfig(connectorConfigurations.getPravegaConfig());
        tableSinkWriterBuilder.withRoutingKeyField(connectorConfigurations.getRoutingKey());
        tableSinkWriterBuilder.withSerializationSchema(serializationSchema);
        tableSinkWriterBuilder.forStream(connectorConfigurations.getWriterStream());
        tableSinkWriterBuilder.withPravegaConfig(connectorConfigurations.getPravegaConfig());
        tableSinkWriterBuilder.getClass();
        Function function = tableSinkWriterBuilder::createSinkFunction;
        tableSinkWriterBuilder.getClass();
        return new FlinkPravegaTableSink(function, tableSinkWriterBuilder::createOutputFormat, physicalSchema);
    }
}
