package io.streamthoughts.jikkou.kafka.reconciler;

import io.streamthoughts.jikkou.core.annotation.SupportedResource;
import io.streamthoughts.jikkou.core.config.Configuration;
import io.streamthoughts.jikkou.core.extension.ExtensionContext;
import io.streamthoughts.jikkou.core.extension.annotations.ExtensionOptionSpec;
import io.streamthoughts.jikkou.core.extension.annotations.ExtensionSpec;
import io.streamthoughts.jikkou.core.models.ResourceListObject;
import io.streamthoughts.jikkou.core.reconciler.Collector;
import io.streamthoughts.jikkou.core.selector.Selector;
import io.streamthoughts.jikkou.kafka.KafkaLabelAndAnnotations;
import io.streamthoughts.jikkou.kafka.collections.V1KafkaTopicList;
import io.streamthoughts.jikkou.kafka.internals.KafkaConfigPredicate;
import io.streamthoughts.jikkou.kafka.internals.admin.AdminClientContext;
import io.streamthoughts.jikkou.kafka.internals.admin.AdminClientContextFactory;
import io.streamthoughts.jikkou.kafka.models.V1KafkaTopic;
import io.streamthoughts.jikkou.kafka.reconciler.service.KafkaTopicService;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Stream;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@SupportedResource(type = V1KafkaTopic.class)
@ExtensionSpec(options = {@ExtensionOptionSpec(name = AdminClientKafkaTopicCollector.STATUS_CONFIG_NAME, description = AdminClientKafkaTopicCollector.STATUS_CONFIG_DESCRIPTION, type = Boolean.class, defaultValue = "false")})
/* loaded from: input_file:io/streamthoughts/jikkou/kafka/reconciler/AdminClientKafkaTopicCollector.class */
public final class AdminClientKafkaTopicCollector extends AdminClientKafkaConfigs implements Collector<V1KafkaTopic> {
    private static final Logger LOG = LoggerFactory.getLogger(AdminClientKafkaTopicCollector.class);
    public static final String STATUS_CONFIG_NAME = "status";
    public static final String STATUS_CONFIG_DESCRIPTION = "Specify whether to describe status information about the topic-partitions";
    private AdminClientContextFactory adminClientContextFactory;

    public AdminClientKafkaTopicCollector() {
    }

    public AdminClientKafkaTopicCollector(@NotNull AdminClientContextFactory adminClientContextFactory) {
        this.adminClientContextFactory = adminClientContextFactory;
    }

    public void init(@NotNull ExtensionContext extensionContext) {
        super.init(extensionContext);
        if (this.adminClientContextFactory == null) {
            this.adminClientContextFactory = new AdminClientContextFactory(extensionContext.appConfiguration());
        }
    }

    public Optional<V1KafkaTopic> get(@NotNull String str, @NotNull Configuration configuration) {
        if (LOG.isInfoEnabled()) {
            LOG.info("Listing all kafka topics with configuration: {}", configuration.asMap());
        }
        KafkaConfigPredicate kafkaConfigPredicate = kafkaConfigPredicate(configuration);
        AdminClientContext createAdminClientContext = this.adminClientContextFactory.createAdminClientContext();
        try {
            List<V1KafkaTopic> listAll = new KafkaTopicService(createAdminClientContext.getAdminClient()).listAll(Set.of(str), kafkaConfigPredicate, ((Boolean) extensionContext().configProperty(STATUS_CONFIG_NAME).get(configuration)).booleanValue());
            if (listAll.isEmpty()) {
                Optional<V1KafkaTopic> empty = Optional.empty();
                if (createAdminClientContext != null) {
                    createAdminClientContext.close();
                }
                return empty;
            }
            Optional<V1KafkaTopic> of = Optional.of(addClusterIdToMetadataAnnotations((V1KafkaTopic) listAll.getFirst(), createAdminClientContext.getClusterId()));
            if (createAdminClientContext != null) {
                createAdminClientContext.close();
            }
            return of;
        } catch (Throwable th) {
            if (createAdminClientContext != null) {
                try {
                    createAdminClientContext.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    public ResourceListObject<V1KafkaTopic> listAll(@NotNull Configuration configuration, @NotNull Selector selector) {
        if (LOG.isInfoEnabled()) {
            LOG.info("Listing all kafka topics with configuration: {}", configuration.asMap());
        }
        KafkaConfigPredicate kafkaConfigPredicate = kafkaConfigPredicate(configuration);
        AdminClientContext createAdminClientContext = this.adminClientContextFactory.createAdminClientContext();
        try {
            List<V1KafkaTopic> listAll = new KafkaTopicService(createAdminClientContext.getAdminClient()).listAll(kafkaConfigPredicate, ((Boolean) extensionContext().configProperty(STATUS_CONFIG_NAME).get(configuration)).booleanValue());
            if (LOG.isInfoEnabled()) {
                LOG.info("Found '{}' kafka topics matching the given selector(s).", Integer.valueOf(listAll.size()));
            }
            String clusterId = createAdminClientContext.getClusterId();
            Stream<V1KafkaTopic> stream = listAll.stream();
            Objects.requireNonNull(selector);
            V1KafkaTopicList v1KafkaTopicList = new V1KafkaTopicList(stream.filter((v1) -> {
                return r1.apply(v1);
            }).map(v1KafkaTopic -> {
                return addClusterIdToMetadataAnnotations(v1KafkaTopic, clusterId);
            }).toList());
            if (createAdminClientContext != null) {
                createAdminClientContext.close();
            }
            return v1KafkaTopicList;
        } catch (Throwable th) {
            if (createAdminClientContext != null) {
                try {
                    createAdminClientContext.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    private V1KafkaTopic addClusterIdToMetadataAnnotations(V1KafkaTopic v1KafkaTopic, String str) {
        return v1KafkaTopic.toBuilder().withMetadata(v1KafkaTopic.getMetadata().toBuilder().withAnnotation(KafkaLabelAndAnnotations.JIKKOU_IO_KAFKA_CLUSTER_ID, str).build()).build();
    }
}
