package io.streamthoughts.jikkou.kafka.control;

import io.streamthoughts.jikkou.annotation.AcceptsConfigProperties;
import io.streamthoughts.jikkou.annotation.AcceptsConfigProperty;
import io.streamthoughts.jikkou.annotation.AcceptsResource;
import io.streamthoughts.jikkou.annotation.AcceptsResources;
import io.streamthoughts.jikkou.api.config.Configuration;
import io.streamthoughts.jikkou.api.control.ResourceCollector;
import io.streamthoughts.jikkou.api.error.JikkouRuntimeException;
import io.streamthoughts.jikkou.api.model.ObjectMeta;
import io.streamthoughts.jikkou.api.selector.AggregateSelector;
import io.streamthoughts.jikkou.api.selector.ResourceSelector;
import io.streamthoughts.jikkou.kafka.AdminClientContext;
import io.streamthoughts.jikkou.kafka.MetadataAnnotations;
import io.streamthoughts.jikkou.kafka.adapters.KafkaConfigsAdapter;
import io.streamthoughts.jikkou.kafka.adapters.V1KafkaTopicSupport;
import io.streamthoughts.jikkou.kafka.converters.V1KafkaTopicListConverter;
import io.streamthoughts.jikkou.kafka.internals.ConfigsBuilder;
import io.streamthoughts.jikkou.kafka.internals.KafkaConfigPredicate;
import io.streamthoughts.jikkou.kafka.models.V1KafkaTopic;
import io.streamthoughts.jikkou.kafka.models.V1KafkaTopicList;
import io.streamthoughts.jikkou.kafka.models.V1KafkaTopicSpec;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.Config;
import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.common.TopicPartitionInfo;
import org.apache.kafka.common.config.ConfigResource;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@AcceptsResources({@AcceptsResource(type = V1KafkaTopic.class), @AcceptsResource(type = V1KafkaTopicList.class, converter = V1KafkaTopicListConverter.class)})
@AcceptsConfigProperties({@AcceptsConfigProperty(name = ConfigDescribeConfiguration.DESCRIBE_DEFAULT_CONFIGS_PROPERTY_NAME, description = ConfigDescribeConfiguration.DESCRIBE_DEFAULT_CONFIGS_PROPERTY_DESC, defaultValue = "false", type = Boolean.class), @AcceptsConfigProperty(name = ConfigDescribeConfiguration.DESCRIBE_DYNAMIC_BROKER_CONFIGS_PROPERTY_NAME, description = ConfigDescribeConfiguration.DESCRIBE_DYNAMIC_BROKER_CONFIGS_PROPERTY_DESC, defaultValue = "false", type = Boolean.class), @AcceptsConfigProperty(name = ConfigDescribeConfiguration.DESCRIBE_STATIC_BROKER_CONFIGS_PROPERTY_CONFIG, description = ConfigDescribeConfiguration.DESCRIBE_STATIC_BROKER_CONFIGS_PROPERTY_DESC, defaultValue = "false", type = Boolean.class)})
/* loaded from: input_file:io/streamthoughts/jikkou/kafka/control/AdminClientKafkaTopicCollector.class */
public final class AdminClientKafkaTopicCollector extends AbstractAdminClientKafkaController implements ResourceCollector<V1KafkaTopic> {
    private static final Logger LOG = LoggerFactory.getLogger(AdminClientKafkaTopicCollector.class);

    /* loaded from: input_file:io/streamthoughts/jikkou/kafka/control/AdminClientKafkaTopicCollector$KafkaTopicsClient.class */
    public static final class KafkaTopicsClient {
        public static final Set<String> NO_CONFIG_MAP_REFS = Collections.emptySet();
        private final AdminClient client;

        public KafkaTopicsClient(AdminClient adminClient) {
            this.client = adminClient;
        }

        public List<V1KafkaTopic> listAll(@NotNull Predicate<ConfigEntry> predicate) {
            try {
                List list = V1KafkaTopicSupport.stream((Set) this.client.listTopics().names().get()).map(v1KafkaTopic -> {
                    return v1KafkaTopic.getMetadata().getName();
                }).toList();
                try {
                    return (List) describeTopics(list).thenCombine((CompletionStage) describeConfigs(list), (map, map2) -> {
                        return map.values().stream().map(topicDescription -> {
                            return newTopicResources(topicDescription, (Config) map2.get(topicDescription.name()), predicate);
                        }).toList();
                    }).get();
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    throw new JikkouRuntimeException(e);
                } catch (ExecutionException e2) {
                    throw new JikkouRuntimeException(e2);
                }
            } catch (InterruptedException e3) {
                Thread.currentThread().interrupt();
                throw new JikkouRuntimeException(e3);
            } catch (ExecutionException e4) {
                throw new JikkouRuntimeException(e4);
            }
        }

        private V1KafkaTopic newTopicResources(TopicDescription topicDescription, Config config, Predicate<ConfigEntry> predicate) {
            return V1KafkaTopic.builder().withMetadata(ObjectMeta.builder().withName(topicDescription.name()).build()).withSpec(V1KafkaTopicSpec.builder().withPartitions(Integer.valueOf(topicDescription.partitions().size())).withReplicas(Short.valueOf((short) computeReplicationFactor(topicDescription))).withConfigs(KafkaConfigsAdapter.of(config, predicate)).withConfigMapRefs(NO_CONFIG_MAP_REFS).build()).build();
        }

        private int computeReplicationFactor(TopicDescription topicDescription) {
            Iterator it = topicDescription.partitions().iterator();
            int size = ((TopicPartitionInfo) it.next()).replicas().size();
            while (it.hasNext() && size != -1) {
                if (size != ((TopicPartitionInfo) it.next()).replicas().size()) {
                    size = -1;
                }
            }
            return size;
        }

        private CompletableFuture<Map<String, Config>> describeConfigs(Collection<String> collection) {
            return CompletableFuture.supplyAsync(() -> {
                try {
                    ConfigsBuilder configsBuilder = new ConfigsBuilder();
                    collection.forEach(str -> {
                        configsBuilder.newResourceConfig().setType(ConfigResource.Type.TOPIC).setName(str);
                    });
                    return (Map) ((Map) this.client.describeConfigs(configsBuilder.build().keySet()).all().get()).entrySet().stream().collect(Collectors.toMap(entry -> {
                        return ((ConfigResource) entry.getKey()).name();
                    }, (v0) -> {
                        return v0.getValue();
                    }));
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    throw new JikkouRuntimeException(e);
                } catch (ExecutionException e2) {
                    throw new JikkouRuntimeException(e2);
                }
            });
        }

        private CompletableFuture<Map<String, TopicDescription>> describeTopics(Collection<String> collection) {
            return CompletableFuture.supplyAsync(() -> {
                try {
                    return (Map) this.client.describeTopics(collection).allTopicNames().get();
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    throw new JikkouRuntimeException(e);
                } catch (ExecutionException e2) {
                    throw new JikkouRuntimeException(e2);
                }
            });
        }
    }

    public AdminClientKafkaTopicCollector() {
    }

    public AdminClientKafkaTopicCollector(@NotNull Configuration configuration) {
        configure(configuration);
    }

    public AdminClientKafkaTopicCollector(@NotNull AdminClientContext adminClientContext) {
        super(adminClientContext);
    }

    public List<V1KafkaTopic> listAll(@NotNull Configuration configuration, @NotNull List<ResourceSelector> list) {
        ConfigDescribeConfiguration configDescribeConfiguration = new ConfigDescribeConfiguration(configuration);
        if (LOG.isInfoEnabled()) {
            LOG.info("Listing all kafka topics using following options: {}", configDescribeConfiguration.asConfiguration().asMap());
        }
        KafkaConfigPredicate withStaticBrokerConfig = new KafkaConfigPredicate().withDynamicTopicConfig(true).withDefaultConfig(configDescribeConfiguration.isDescribeDefaultConfigs()).withDynamicBrokerConfig(configDescribeConfiguration.isDescribeDynamicBrokerConfigs()).withStaticBrokerConfig(configDescribeConfiguration.isDescribeStaticBrokerConfigs());
        List list2 = (List) this.adminClientContext.invoke(adminClient -> {
            return new KafkaTopicsClient(adminClient).listAll(withStaticBrokerConfig);
        });
        if (LOG.isInfoEnabled()) {
            LOG.info("Found '{}' kafka topics matching the given selector(s).", Integer.valueOf(list2.size()));
        }
        String clusterId = this.adminClientContext.getClusterId();
        Stream stream = list2.stream();
        AggregateSelector aggregateSelector = new AggregateSelector(list);
        return stream.filter((v1) -> {
            return r1.apply(v1);
        }).map(v1KafkaTopic -> {
            return v1KafkaTopic.toBuilder().withMetadata(v1KafkaTopic.getMetadata().toBuilder().withAnnotation(MetadataAnnotations.JIKKOU_IO_KAFKA_CLUSTER_ID, clusterId).build()).build();
        }).toList();
    }
}
