package io.streamthoughts.jikkou.kafka.control;

import io.streamthoughts.jikkou.annotation.AcceptsConfigProperties;
import io.streamthoughts.jikkou.annotation.AcceptsConfigProperty;
import io.streamthoughts.jikkou.annotation.AcceptsResource;
import io.streamthoughts.jikkou.annotation.AcceptsResources;
import io.streamthoughts.jikkou.api.config.Configuration;
import io.streamthoughts.jikkou.api.control.ResourceCollector;
import io.streamthoughts.jikkou.api.error.ConfigException;
import io.streamthoughts.jikkou.api.error.JikkouRuntimeException;
import io.streamthoughts.jikkou.api.model.ObjectMeta;
import io.streamthoughts.jikkou.api.selector.AggregateSelector;
import io.streamthoughts.jikkou.api.selector.ResourceSelector;
import io.streamthoughts.jikkou.common.utils.AsyncUtils;
import io.streamthoughts.jikkou.kafka.MetadataAnnotations;
import io.streamthoughts.jikkou.kafka.adapters.KafkaConfigsAdapter;
import io.streamthoughts.jikkou.kafka.converters.V1KafkaTopicListConverter;
import io.streamthoughts.jikkou.kafka.internals.ConfigsBuilder;
import io.streamthoughts.jikkou.kafka.internals.Futures;
import io.streamthoughts.jikkou.kafka.internals.KafkaConfigPredicate;
import io.streamthoughts.jikkou.kafka.internals.admin.AdminClientContext;
import io.streamthoughts.jikkou.kafka.internals.admin.AdminClientContextFactory;
import io.streamthoughts.jikkou.kafka.models.V1KafkaTopic;
import io.streamthoughts.jikkou.kafka.models.V1KafkaTopicList;
import io.streamthoughts.jikkou.kafka.models.V1KafkaTopicSpec;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.Config;
import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.common.TopicPartitionInfo;
import org.apache.kafka.common.config.ConfigResource;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@AcceptsResources({@AcceptsResource(type = V1KafkaTopic.class), @AcceptsResource(type = V1KafkaTopicList.class, converter = V1KafkaTopicListConverter.class)})
@AcceptsConfigProperties({@AcceptsConfigProperty(name = ConfigDescribeConfiguration.DESCRIBE_DEFAULT_CONFIGS_PROPERTY_NAME, description = ConfigDescribeConfiguration.DESCRIBE_DEFAULT_CONFIGS_PROPERTY_DESC, defaultValue = "false", type = Boolean.class, isRequired = false), @AcceptsConfigProperty(name = ConfigDescribeConfiguration.DESCRIBE_DYNAMIC_BROKER_CONFIGS_PROPERTY_NAME, description = ConfigDescribeConfiguration.DESCRIBE_DYNAMIC_BROKER_CONFIGS_PROPERTY_DESC, defaultValue = "false", type = Boolean.class, isRequired = false), @AcceptsConfigProperty(name = ConfigDescribeConfiguration.DESCRIBE_STATIC_BROKER_CONFIGS_PROPERTY_CONFIG, description = ConfigDescribeConfiguration.DESCRIBE_STATIC_BROKER_CONFIGS_PROPERTY_DESC, defaultValue = "false", type = Boolean.class, isRequired = false)})
/* loaded from: input_file:io/streamthoughts/jikkou/kafka/control/AdminClientKafkaTopicCollector.class */
public final class AdminClientKafkaTopicCollector implements ResourceCollector<V1KafkaTopic> {
    private static final Logger LOG = LoggerFactory.getLogger(AdminClientKafkaTopicCollector.class);
    private AdminClientContextFactory adminClientContextFactory;

    /* loaded from: input_file:io/streamthoughts/jikkou/kafka/control/AdminClientKafkaTopicCollector$KafkaTopicsClient.class */
    public static final class KafkaTopicsClient {
        public static final Set<String> NO_CONFIG_MAP_REFS = Collections.emptySet();
        private final AdminClient client;

        public KafkaTopicsClient(AdminClient adminClient) {
            this.client = adminClient;
        }

        public List<V1KafkaTopic> listAll(@NotNull Predicate<ConfigEntry> predicate) {
            Set set = (Set) AsyncUtils.getValueOrThrowException(Futures.toCompletableFuture(this.client.listTopics().names()), exc -> {
                return new JikkouRuntimeException("Failed to list kafka topics", exc);
            });
            return (List) AsyncUtils.getValueOrThrowException(getDescriptionForTopics(set).thenCombine((CompletionStage) getConfigForTopics(set), (map, map2) -> {
                return map.values().stream().map(topicDescription -> {
                    return newTopicResources(topicDescription, (Config) map2.get(topicDescription.name()), predicate);
                }).toList();
            }), exc2 -> {
                return new JikkouRuntimeException("Failed to retrieve kafka topic descriptions/or configurations.", exc2);
            });
        }

        private V1KafkaTopic newTopicResources(TopicDescription topicDescription, Config config, Predicate<ConfigEntry> predicate) {
            return V1KafkaTopic.builder().withMetadata(ObjectMeta.builder().withName(topicDescription.name()).build()).withSpec(V1KafkaTopicSpec.builder().withPartitions(Integer.valueOf(topicDescription.partitions().size())).withReplicas(Short.valueOf((short) computeReplicationFactor(topicDescription))).withConfigs(KafkaConfigsAdapter.of(config, predicate)).withConfigMapRefs(NO_CONFIG_MAP_REFS).build()).build();
        }

        private int computeReplicationFactor(TopicDescription topicDescription) {
            Iterator it = topicDescription.partitions().iterator();
            int size = ((TopicPartitionInfo) it.next()).replicas().size();
            while (it.hasNext() && size != -1) {
                if (size != ((TopicPartitionInfo) it.next()).replicas().size()) {
                    size = -1;
                }
            }
            return size;
        }

        private CompletableFuture<Map<String, Config>> getConfigForTopics(Collection<String> collection) {
            ConfigsBuilder configsBuilder = new ConfigsBuilder();
            collection.forEach(str -> {
                configsBuilder.newResourceConfig().setType(ConfigResource.Type.TOPIC).setName(str);
            });
            return Futures.toCompletableFuture(this.client.describeConfigs(configsBuilder.build().keySet()).all()).thenApply(map -> {
                return (Map) map.entrySet().stream().collect(Collectors.toMap(entry -> {
                    return ((ConfigResource) entry.getKey()).name();
                }, (v0) -> {
                    return v0.getValue();
                }));
            });
        }

        private CompletableFuture<Map<String, TopicDescription>> getDescriptionForTopics(Collection<String> collection) {
            return Futures.toCompletableFuture(this.client.describeTopics(collection).allTopicNames());
        }
    }

    public AdminClientKafkaTopicCollector() {
    }

    public AdminClientKafkaTopicCollector(@NotNull AdminClientContextFactory adminClientContextFactory) {
        this.adminClientContextFactory = adminClientContextFactory;
    }

    public void configure(@NotNull Configuration configuration) throws ConfigException {
        LOG.info("Configuring");
        if (this.adminClientContextFactory == null) {
            this.adminClientContextFactory = new AdminClientContextFactory(configuration);
        }
    }

    public List<V1KafkaTopic> listAll(@NotNull Configuration configuration, @NotNull List<ResourceSelector> list) {
        ConfigDescribeConfiguration configDescribeConfiguration = new ConfigDescribeConfiguration(configuration);
        if (LOG.isInfoEnabled()) {
            LOG.info("Listing all kafka topics using following options: {}", configDescribeConfiguration.asConfiguration().asMap());
        }
        KafkaConfigPredicate withStaticBrokerConfig = new KafkaConfigPredicate().withDynamicTopicConfig(true).withDefaultConfig(configDescribeConfiguration.isDescribeDefaultConfigs()).withDynamicBrokerConfig(configDescribeConfiguration.isDescribeDynamicBrokerConfigs()).withStaticBrokerConfig(configDescribeConfiguration.isDescribeStaticBrokerConfigs());
        AdminClientContext createAdminClientContext = this.adminClientContextFactory.createAdminClientContext();
        try {
            List<V1KafkaTopic> listAll = new KafkaTopicsClient(createAdminClientContext.getAdminClient()).listAll(withStaticBrokerConfig);
            if (LOG.isInfoEnabled()) {
                LOG.info("Found '{}' kafka topics matching the given selector(s).", Integer.valueOf(listAll.size()));
            }
            String clusterId = createAdminClientContext.getClusterId();
            Stream<V1KafkaTopic> stream = listAll.stream();
            AggregateSelector aggregateSelector = new AggregateSelector(list);
            List<V1KafkaTopic> list2 = stream.filter((v1) -> {
                return r1.apply(v1);
            }).map(v1KafkaTopic -> {
                return v1KafkaTopic.toBuilder().withMetadata(v1KafkaTopic.getMetadata().toBuilder().withAnnotation(MetadataAnnotations.JIKKOU_IO_KAFKA_CLUSTER_ID, clusterId).build()).build();
            }).toList();
            if (createAdminClientContext != null) {
                createAdminClientContext.close();
            }
            return list2;
        } catch (Throwable th) {
            if (createAdminClientContext != null) {
                try {
                    createAdminClientContext.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }
}
