package io.streamthoughts.jikkou.schema.registry.control;

import io.streamthoughts.jikkou.annotation.AcceptsResource;
import io.streamthoughts.jikkou.api.config.Configuration;
import io.streamthoughts.jikkou.api.control.ResourceCollector;
import io.streamthoughts.jikkou.api.error.ConfigException;
import io.streamthoughts.jikkou.api.error.JikkouRuntimeException;
import io.streamthoughts.jikkou.api.selector.ResourceSelector;
import io.streamthoughts.jikkou.common.utils.AsyncUtils;
import io.streamthoughts.jikkou.common.utils.Tuple2;
import io.streamthoughts.jikkou.rest.client.RestClientException;
import io.streamthoughts.jikkou.schema.registry.V1SchemaRegistrySubjectFactory;
import io.streamthoughts.jikkou.schema.registry.api.AsyncSchemaRegistryApi;
import io.streamthoughts.jikkou.schema.registry.api.DefaultAsyncSchemaRegistryApi;
import io.streamthoughts.jikkou.schema.registry.api.SchemaRegistryApiFactory;
import io.streamthoughts.jikkou.schema.registry.api.SchemaRegistryClientConfig;
import io.streamthoughts.jikkou.schema.registry.api.data.SubjectSchemaVersion;
import io.streamthoughts.jikkou.schema.registry.model.CompatibilityLevels;
import io.streamthoughts.jikkou.schema.registry.models.V1SchemaRegistrySubject;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import org.jetbrains.annotations.NotNull;

@AcceptsResource(type = V1SchemaRegistrySubject.class)
/* loaded from: input_file:io/streamthoughts/jikkou/schema/registry/control/SchemaRegistrySubjectCollector.class */
public class SchemaRegistrySubjectCollector implements ResourceCollector<V1SchemaRegistrySubject> {
    private SchemaRegistryClientConfig config;
    private boolean prettyPrintSchema = true;
    private boolean defaultToGlobalCompatibilityLevel = true;
    private V1SchemaRegistrySubjectFactory schemaRegistrySubjectFactory;

    public SchemaRegistrySubjectCollector() {
    }

    public SchemaRegistrySubjectCollector(SchemaRegistryClientConfig schemaRegistryClientConfig) {
        configure(schemaRegistryClientConfig);
    }

    @Override // io.streamthoughts.jikkou.api.config.Configurable
    public void configure(@NotNull Configuration configuration) throws ConfigException {
        configure(new SchemaRegistryClientConfig(configuration));
    }

    private void configure(@NotNull SchemaRegistryClientConfig schemaRegistryClientConfig) throws ConfigException {
        this.config = schemaRegistryClientConfig;
        this.schemaRegistrySubjectFactory = new V1SchemaRegistrySubjectFactory(schemaRegistryClientConfig.getSchemaRegistryVendor(), schemaRegistryClientConfig.getSchemaRegistryUrl(), this.prettyPrintSchema);
    }

    @Override // io.streamthoughts.jikkou.api.control.ResourceCollector
    public List<V1SchemaRegistrySubject> listAll(@NotNull Configuration configuration, @NotNull List<ResourceSelector> list) {
        DefaultAsyncSchemaRegistryApi defaultAsyncSchemaRegistryApi = new DefaultAsyncSchemaRegistryApi(SchemaRegistryApiFactory.create(this.config));
        try {
            CompletableFuture<U> thenComposeAsync = defaultAsyncSchemaRegistryApi.listSubjects().thenComposeAsync(list2 -> {
                return AsyncUtils.waitForAll(getAllSchemaRegistrySubjectsAsync(list2, defaultAsyncSchemaRegistryApi));
            });
            Optional<Throwable> exception = AsyncUtils.getException(thenComposeAsync);
            if (exception.isPresent()) {
                throw new JikkouRuntimeException("Failed to list all schema registry subject versions", exception.get());
            }
            List<V1SchemaRegistrySubject> list3 = (List) thenComposeAsync.join();
            defaultAsyncSchemaRegistryApi.close();
            return list3;
        } catch (Throwable th) {
            defaultAsyncSchemaRegistryApi.close();
            throw th;
        }
    }

    public SchemaRegistrySubjectCollector prettyPrintSchema(boolean z) {
        this.prettyPrintSchema = z;
        return this;
    }

    public SchemaRegistrySubjectCollector defaultToGlobalCompatibilityLevel(boolean z) {
        this.defaultToGlobalCompatibilityLevel = z;
        return this;
    }

    @NotNull
    private List<CompletableFuture<V1SchemaRegistrySubject>> getAllSchemaRegistrySubjectsAsync(List<String> list, AsyncSchemaRegistryApi asyncSchemaRegistryApi) {
        return list.stream().map(str -> {
            return getSchemaRegistrySubjectAsync(asyncSchemaRegistryApi, str, this.defaultToGlobalCompatibilityLevel, this.schemaRegistrySubjectFactory);
        }).toList();
    }

    @NotNull
    private static CompletableFuture<V1SchemaRegistrySubject> getSchemaRegistrySubjectAsync(@NotNull AsyncSchemaRegistryApi asyncSchemaRegistryApi, @NotNull String str, boolean z, @NotNull V1SchemaRegistrySubjectFactory v1SchemaRegistrySubjectFactory) {
        return asyncSchemaRegistryApi.getLatestSubjectSchema(str).thenCompose(subjectSchemaVersion -> {
            return asyncSchemaRegistryApi.getSubjectCompatibilityLevel(str, z).thenApply(compatibilityLevelObject -> {
                return CompatibilityLevels.valueOf(compatibilityLevelObject.compatibilityLevel());
            }).exceptionally((Function<Throwable, ? extends U>) th -> {
                if (th.getCause() != null) {
                    th = th.getCause();
                }
                if ((th instanceof RestClientException) && ((RestClientException) th).response().getStatus() == 404) {
                    return null;
                }
                if (th instanceof RuntimeException) {
                    throw ((RuntimeException) th);
                }
                throw new JikkouRuntimeException(th);
            }).thenApply(compatibilityLevels -> {
                return Tuple2.of(subjectSchemaVersion, compatibilityLevels);
            });
        }).thenApply((Function<? super U, ? extends U>) tuple2 -> {
            return v1SchemaRegistrySubjectFactory.createSchemaRegistrySubject((SubjectSchemaVersion) tuple2._1(), (CompatibilityLevels) tuple2._2());
        });
    }
}
