package io.pravega.connectors.flink.table.catalog.pravega;

import io.pravega.client.ClientConfig;
import io.pravega.client.admin.StreamManager;
import io.pravega.client.stream.DeleteScopeFailedException;
import io.pravega.client.stream.StreamConfiguration;
import io.pravega.connectors.flink.PravegaConfig;
import io.pravega.connectors.flink.dynamic.table.FlinkPravegaDynamicTableFactory;
import io.pravega.connectors.flink.dynamic.table.PravegaOptions;
import io.pravega.connectors.flink.formats.registry.PravegaRegistryFormatFactory;
import io.pravega.connectors.flink.formats.registry.PravegaRegistryOptions;
import io.pravega.connectors.flink.table.catalog.pravega.util.PravegaSchemaUtils;
import io.pravega.connectors.flink.util.SchemaRegistryUtils;
import io.pravega.schemaregistry.client.SchemaRegistryClient;
import io.pravega.schemaregistry.client.SchemaRegistryClientConfig;
import io.pravega.schemaregistry.client.SchemaRegistryClientFactory;
import io.pravega.schemaregistry.contract.data.Compatibility;
import io.pravega.schemaregistry.contract.data.GroupProperties;
import io.pravega.schemaregistry.contract.data.SerializationFormat;
import io.pravega.shared.NameUtils;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.catalog.AbstractCatalog;
import org.apache.flink.table.catalog.CatalogBaseTable;
import org.apache.flink.table.catalog.CatalogDatabase;
import org.apache.flink.table.catalog.CatalogDatabaseImpl;
import org.apache.flink.table.catalog.CatalogFunction;
import org.apache.flink.table.catalog.CatalogPartition;
import org.apache.flink.table.catalog.CatalogPartitionSpec;
import org.apache.flink.table.catalog.CatalogTableImpl;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.catalog.exceptions.CatalogException;
import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException;
import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
import org.apache.flink.table.catalog.exceptions.FunctionAlreadyExistException;
import org.apache.flink.table.catalog.exceptions.FunctionNotExistException;
import org.apache.flink.table.catalog.exceptions.PartitionAlreadyExistsException;
import org.apache.flink.table.catalog.exceptions.PartitionNotExistException;
import org.apache.flink.table.catalog.exceptions.PartitionSpecInvalidException;
import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
import org.apache.flink.table.catalog.exceptions.TableNotExistException;
import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException;
import org.apache.flink.table.catalog.exceptions.TablePartitionedException;
import org.apache.flink.table.catalog.stats.CatalogColumnStatistics;
import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
import org.apache.flink.table.expressions.Expression;
import org.apache.flink.table.factories.Factory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/pravega/connectors/flink/table/catalog/pravega/PravegaCatalog.class */
public class PravegaCatalog extends AbstractCatalog {
    private static final Logger LOG = LoggerFactory.getLogger(PravegaCatalog.class);
    private StreamManager streamManager;
    private SchemaRegistryClient schemaRegistryClient;
    private ClientConfig clientConfig;
    private SchemaRegistryClientConfig config;
    private Map<String, String> properties;
    private SerializationFormat serializationFormat;

    public PravegaCatalog(String str, String str2, Map<String, String> map, PravegaConfig pravegaConfig, String str3) {
        super(str, str2);
        this.clientConfig = pravegaConfig.getClientConfig();
        this.config = SchemaRegistryUtils.getSchemaRegistryClientConfig(pravegaConfig);
        this.serializationFormat = SerializationFormat.valueOf(str3);
        this.properties = map;
        LOG.info("Created Pravega Catalog {}", str);
    }

    public void open() throws CatalogException {
        if (this.streamManager == null) {
            try {
                this.streamManager = StreamManager.create(this.clientConfig);
            } catch (Exception e) {
                throw new CatalogException("Failed to connect Pravega controller");
            }
        }
        LOG.info("Connected to Pravega controller");
        if (!databaseExists(getDefaultDatabase())) {
            throw new CatalogException(String.format("Configured default database %s doesn't exist in catalog %s.", getDefaultDatabase(), getName()));
        }
        if (this.schemaRegistryClient == null) {
            try {
                this.schemaRegistryClient = SchemaRegistryClientFactory.withNamespace(getDefaultDatabase(), this.config);
            } catch (Exception e2) {
                throw new CatalogException("Failed to connect Pravega Schema Registry");
            }
        }
        LOG.info("Connected to Pravega Schema Registry");
    }

    public void close() throws CatalogException {
        if (this.streamManager != null) {
            this.streamManager.close();
            this.streamManager = null;
            LOG.info("Close connection to Pravega");
        }
        try {
            if (this.schemaRegistryClient != null) {
                try {
                    this.schemaRegistryClient.close();
                    LOG.info("Close connection to Pravega Schema registry");
                    this.schemaRegistryClient = null;
                } catch (Exception e) {
                    throw new CatalogException("Fail to close connection to Pravega Schema registry", e);
                }
            }
        } catch (Throwable th) {
            this.schemaRegistryClient = null;
            throw th;
        }
    }

    public Optional<Factory> getFactory() {
        return Optional.of(new FlinkPravegaDynamicTableFactory());
    }

    public List<String> listDatabases() throws CatalogException {
        Iterable iterable = () -> {
            return this.streamManager.listScopes();
        };
        return (List) StreamSupport.stream(iterable.spliterator(), false).filter(str -> {
            return !str.startsWith(NameUtils.INTERNAL_NAME_PREFIX);
        }).collect(Collectors.toList());
    }

    public CatalogDatabase getDatabase(String str) throws DatabaseNotExistException, CatalogException {
        if (databaseExists(str)) {
            return new CatalogDatabaseImpl(Collections.emptyMap(), (String) null);
        }
        throw new DatabaseNotExistException(getName(), str);
    }

    public boolean databaseExists(String str) throws CatalogException {
        return this.streamManager.checkScopeExists(str);
    }

    public void createDatabase(String str, CatalogDatabase catalogDatabase, boolean z) throws DatabaseAlreadyExistException, CatalogException {
        if (!databaseExists(str)) {
            this.streamManager.createScope(str);
        } else if (!z) {
            throw new DatabaseAlreadyExistException(getName(), str);
        }
    }

    public void dropDatabase(String str, boolean z, boolean z2) throws DatabaseNotExistException, DatabaseNotEmptyException, CatalogException {
        if (!databaseExists(str)) {
            if (!z) {
                throw new DatabaseNotExistException(getName(), str);
            }
            return;
        }
        changeRegistryNamespace(str);
        if (listTables(str).size() != 0) {
            if (!z2) {
                throw new DatabaseNotEmptyException(getName(), str);
            }
            List<String> listTables = listTables(str);
            SchemaRegistryClient schemaRegistryClient = this.schemaRegistryClient;
            Objects.requireNonNull(schemaRegistryClient);
            listTables.forEach(schemaRegistryClient::removeGroup);
        }
        try {
            this.streamManager.deleteScope(str, z2);
        } catch (DeleteScopeFailedException e) {
            throw new CatalogException(String.format("Failed to drop database %s", str));
        }
    }

    public void alterDatabase(String str, CatalogDatabase catalogDatabase, boolean z) throws DatabaseNotExistException, CatalogException {
        throw new UnsupportedOperationException();
    }

    public List<String> listTables(String str) throws DatabaseNotExistException, CatalogException {
        if (!databaseExists(str)) {
            throw new DatabaseNotExistException(getName(), str);
        }
        Iterable iterable = () -> {
            return this.streamManager.listStreams(str);
        };
        HashSet hashSet = new HashSet();
        changeRegistryNamespace(str);
        this.schemaRegistryClient.listGroups().forEachRemaining(entry -> {
            hashSet.add((String) entry.getKey());
        });
        Stream filter = StreamSupport.stream(iterable.spliterator(), false).map((v0) -> {
            return v0.getStreamName();
        }).filter(str2 -> {
            return !str2.startsWith(NameUtils.INTERNAL_NAME_PREFIX);
        });
        Objects.requireNonNull(hashSet);
        return (List) filter.filter((v1) -> {
            return r1.contains(v1);
        }).collect(Collectors.toList());
    }

    public List<String> listViews(String str) throws DatabaseNotExistException, CatalogException {
        return Collections.emptyList();
    }

    public CatalogBaseTable getTable(ObjectPath objectPath) throws TableNotExistException, CatalogException {
        if (!tableExists(objectPath)) {
            throw new TableNotExistException(getName(), objectPath);
        }
        String databaseName = objectPath.getDatabaseName();
        String objectName = objectPath.getObjectName();
        changeRegistryNamespace(databaseName);
        ResolvedSchema schemaInfoToResolvedSchema = PravegaSchemaUtils.schemaInfoToResolvedSchema(this.schemaRegistryClient.getLatestSchemaVersion(objectName, (String) null).getSchemaInfo());
        Map<String, String> map = this.properties;
        map.put(PravegaOptions.SCOPE.key(), databaseName);
        map.put(PravegaOptions.SCAN_STREAMS.key(), objectName);
        map.put(PravegaOptions.SINK_STREAM.key(), objectName);
        map.put(String.format("%s.%s", PravegaRegistryFormatFactory.IDENTIFIER, PravegaRegistryOptions.NAMESPACE.key()), databaseName);
        map.put(String.format("%s.%s", PravegaRegistryFormatFactory.IDENTIFIER, PravegaRegistryOptions.GROUP_ID.key()), objectName);
        return new CatalogTableImpl(TableSchema.fromResolvedSchema(schemaInfoToResolvedSchema), map, "");
    }

    public boolean tableExists(ObjectPath objectPath) throws CatalogException {
        return this.streamManager.checkStreamExists(objectPath.getDatabaseName(), objectPath.getObjectName());
    }

    public void dropTable(ObjectPath objectPath, boolean z) throws TableNotExistException, CatalogException {
        if (!tableExists(objectPath)) {
            if (!z) {
                throw new TableNotExistException(getName(), objectPath);
            }
            return;
        }
        String databaseName = objectPath.getDatabaseName();
        String objectName = objectPath.getObjectName();
        this.streamManager.sealStream(databaseName, objectName);
        this.streamManager.deleteStream(databaseName, objectName);
        changeRegistryNamespace(databaseName);
        try {
            this.schemaRegistryClient.removeGroup(objectName);
        } catch (Exception e) {
            throw new CatalogException(String.format("Fail to drop table %s/%s", databaseName, objectName), e);
        }
    }

    public void renameTable(ObjectPath objectPath, String str, boolean z) throws TableNotExistException, TableAlreadyExistException, CatalogException {
        throw new UnsupportedOperationException();
    }

    public void createTable(ObjectPath objectPath, CatalogBaseTable catalogBaseTable, boolean z) throws TableAlreadyExistException, DatabaseNotExistException, CatalogException {
        String databaseName = objectPath.getDatabaseName();
        String objectName = objectPath.getObjectName();
        if (!databaseExists(databaseName)) {
            throw new DatabaseNotExistException(getName(), databaseName);
        }
        if (tableExists(objectPath)) {
            if (!z) {
                throw new TableAlreadyExistException(getName(), objectPath);
            }
            return;
        }
        this.streamManager.createStream(databaseName, objectName, StreamConfiguration.builder().build());
        changeRegistryNamespace(databaseName);
        this.schemaRegistryClient.addGroup(objectName, new GroupProperties(this.serializationFormat, Compatibility.allowAny(), true));
        this.schemaRegistryClient.addSchema(objectName, PravegaSchemaUtils.tableSchemaToSchemaInfo(catalogBaseTable.getSchema(), this.serializationFormat));
    }

    public void alterTable(ObjectPath objectPath, CatalogBaseTable catalogBaseTable, boolean z) throws TableNotExistException, CatalogException {
        throw new UnsupportedOperationException();
    }

    public List<CatalogPartitionSpec> listPartitions(ObjectPath objectPath) throws TableNotExistException, TableNotPartitionedException, CatalogException {
        return Collections.emptyList();
    }

    public List<CatalogPartitionSpec> listPartitions(ObjectPath objectPath, CatalogPartitionSpec catalogPartitionSpec) throws TableNotExistException, TableNotPartitionedException, CatalogException {
        return Collections.emptyList();
    }

    public List<CatalogPartitionSpec> listPartitionsByFilter(ObjectPath objectPath, List<Expression> list) throws TableNotExistException, TableNotPartitionedException, CatalogException {
        return Collections.emptyList();
    }

    public CatalogPartition getPartition(ObjectPath objectPath, CatalogPartitionSpec catalogPartitionSpec) throws PartitionNotExistException, CatalogException {
        throw new PartitionNotExistException(getName(), objectPath, catalogPartitionSpec);
    }

    public boolean partitionExists(ObjectPath objectPath, CatalogPartitionSpec catalogPartitionSpec) throws CatalogException {
        return false;
    }

    public void createPartition(ObjectPath objectPath, CatalogPartitionSpec catalogPartitionSpec, CatalogPartition catalogPartition, boolean z) throws TableNotExistException, TableNotPartitionedException, PartitionSpecInvalidException, PartitionAlreadyExistsException, CatalogException {
        throw new UnsupportedOperationException();
    }

    public void dropPartition(ObjectPath objectPath, CatalogPartitionSpec catalogPartitionSpec, boolean z) throws PartitionNotExistException, CatalogException {
        throw new UnsupportedOperationException();
    }

    public void alterPartition(ObjectPath objectPath, CatalogPartitionSpec catalogPartitionSpec, CatalogPartition catalogPartition, boolean z) throws PartitionNotExistException, CatalogException {
        throw new UnsupportedOperationException();
    }

    public List<String> listFunctions(String str) throws DatabaseNotExistException, CatalogException {
        return Collections.emptyList();
    }

    public CatalogFunction getFunction(ObjectPath objectPath) throws FunctionNotExistException, CatalogException {
        throw new FunctionNotExistException(getName(), objectPath);
    }

    public boolean functionExists(ObjectPath objectPath) throws CatalogException {
        return false;
    }

    public void createFunction(ObjectPath objectPath, CatalogFunction catalogFunction, boolean z) throws FunctionAlreadyExistException, DatabaseNotExistException, CatalogException {
        throw new UnsupportedOperationException();
    }

    public void alterFunction(ObjectPath objectPath, CatalogFunction catalogFunction, boolean z) throws FunctionNotExistException, CatalogException {
        throw new UnsupportedOperationException();
    }

    public void dropFunction(ObjectPath objectPath, boolean z) throws FunctionNotExistException, CatalogException {
        throw new UnsupportedOperationException();
    }

    public CatalogTableStatistics getTableStatistics(ObjectPath objectPath) throws TableNotExistException, CatalogException {
        return CatalogTableStatistics.UNKNOWN;
    }

    public CatalogColumnStatistics getTableColumnStatistics(ObjectPath objectPath) throws TableNotExistException, CatalogException {
        return CatalogColumnStatistics.UNKNOWN;
    }

    public CatalogTableStatistics getPartitionStatistics(ObjectPath objectPath, CatalogPartitionSpec catalogPartitionSpec) throws PartitionNotExistException, CatalogException {
        return CatalogTableStatistics.UNKNOWN;
    }

    public CatalogColumnStatistics getPartitionColumnStatistics(ObjectPath objectPath, CatalogPartitionSpec catalogPartitionSpec) throws PartitionNotExistException, CatalogException {
        return CatalogColumnStatistics.UNKNOWN;
    }

    public void alterTableStatistics(ObjectPath objectPath, CatalogTableStatistics catalogTableStatistics, boolean z) throws TableNotExistException, CatalogException {
        throw new UnsupportedOperationException();
    }

    public void alterTableColumnStatistics(ObjectPath objectPath, CatalogColumnStatistics catalogColumnStatistics, boolean z) throws TableNotExistException, CatalogException, TablePartitionedException {
        throw new UnsupportedOperationException();
    }

    public void alterPartitionStatistics(ObjectPath objectPath, CatalogPartitionSpec catalogPartitionSpec, CatalogTableStatistics catalogTableStatistics, boolean z) throws PartitionNotExistException, CatalogException {
        throw new UnsupportedOperationException();
    }

    public void alterPartitionColumnStatistics(ObjectPath objectPath, CatalogPartitionSpec catalogPartitionSpec, CatalogColumnStatistics catalogColumnStatistics, boolean z) throws PartitionNotExistException, CatalogException {
        throw new UnsupportedOperationException();
    }

    private void changeRegistryNamespace(String str) {
        try {
            this.schemaRegistryClient.close();
            this.schemaRegistryClient = SchemaRegistryClientFactory.withNamespace(str, this.config);
        } catch (Exception e) {
            throw new CatalogException("Fail to close connection to Pravega Schema registry", e);
        }
    }
}
