package io.streamthoughts.jikkou.kafka.reconcilier;

import io.streamthoughts.jikkou.core.annotation.HandledResource;
import io.streamthoughts.jikkou.core.config.Configuration;
import io.streamthoughts.jikkou.core.exceptions.ConfigException;
import io.streamthoughts.jikkou.core.exceptions.JikkouRuntimeException;
import io.streamthoughts.jikkou.core.extension.annotations.ConfigPropertySpec;
import io.streamthoughts.jikkou.core.extension.annotations.ExtensionConfigProperties;
import io.streamthoughts.jikkou.core.models.ObjectMeta;
import io.streamthoughts.jikkou.core.models.ResourceListObject;
import io.streamthoughts.jikkou.core.reconcilier.Collector;
import io.streamthoughts.jikkou.core.selectors.AggregateSelector;
import io.streamthoughts.jikkou.core.selectors.Selector;
import io.streamthoughts.jikkou.kafka.MetadataAnnotations;
import io.streamthoughts.jikkou.kafka.adapters.KafkaConfigsAdapter;
import io.streamthoughts.jikkou.kafka.collections.V1KafkaBrokerList;
import io.streamthoughts.jikkou.kafka.internals.ConfigsBuilder;
import io.streamthoughts.jikkou.kafka.internals.KafkaConfigPredicate;
import io.streamthoughts.jikkou.kafka.internals.KafkaUtils;
import io.streamthoughts.jikkou.kafka.internals.admin.AdminClientContext;
import io.streamthoughts.jikkou.kafka.internals.admin.AdminClientContextFactory;
import io.streamthoughts.jikkou.kafka.models.V1KafkaBroker;
import io.streamthoughts.jikkou.kafka.models.V1KafkaBrokersSpec;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.Config;
import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.config.ConfigResource;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@HandledResource(type = V1KafkaBroker.class)
@ExtensionConfigProperties(properties = {@ConfigPropertySpec(name = ConfigDescribeConfiguration.DESCRIBE_DEFAULT_CONFIGS_PROPERTY_NAME, description = ConfigDescribeConfiguration.DESCRIBE_DEFAULT_CONFIGS_PROPERTY_DESC, defaultValue = "false", type = Boolean.class, isRequired = false), @ConfigPropertySpec(name = ConfigDescribeConfiguration.DESCRIBE_DYNAMIC_BROKER_CONFIGS_PROPERTY_NAME, description = ConfigDescribeConfiguration.DESCRIBE_DYNAMIC_BROKER_CONFIGS_PROPERTY_DESC, defaultValue = "false", type = Boolean.class, isRequired = false), @ConfigPropertySpec(name = ConfigDescribeConfiguration.DESCRIBE_STATIC_BROKER_CONFIGS_PROPERTY_CONFIG, description = ConfigDescribeConfiguration.DESCRIBE_STATIC_BROKER_CONFIGS_PROPERTY_DESC, defaultValue = "false", type = Boolean.class, isRequired = false)})
/* loaded from: input_file:io/streamthoughts/jikkou/kafka/reconcilier/AdminClientKafkaBrokerCollector.class */
public final class AdminClientKafkaBrokerCollector implements Collector<V1KafkaBroker> {
    private static final Logger LOG = LoggerFactory.getLogger(AdminClientKafkaBrokerCollector.class);
    private AdminClientContextFactory adminClientContextFactory;

    /* loaded from: input_file:io/streamthoughts/jikkou/kafka/reconcilier/AdminClientKafkaBrokerCollector$KafkaBrokerClient.class */
    public final class KafkaBrokerClient {
        private final AdminClient client;

        public KafkaBrokerClient(AdminClient adminClient) {
            this.client = adminClient;
        }

        public List<V1KafkaBroker> listAll(Predicate<ConfigEntry> predicate) {
            Collection<String> loadClusterBrokerIds = AdminClientKafkaBrokerCollector.this.loadClusterBrokerIds(this.client);
            try {
                return (List) describeCluster().thenCombine((CompletionStage) describeConfigs(loadClusterBrokerIds), (map, map2) -> {
                    return (List) map.values().stream().map(node -> {
                        return V1KafkaBroker.builder().withMetadata(ObjectMeta.builder().withName(node.idString()).build()).withSpec(V1KafkaBrokersSpec.builder().withId(node.idString()).withRack(node.rack()).withHost(node.host()).withPort(Integer.valueOf(node.port())).withConfigs(KafkaConfigsAdapter.of((Config) map2.get(node.idString()), predicate)).build()).build();
                    }).collect(Collectors.toList());
                }).get();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new JikkouRuntimeException(e);
            } catch (ExecutionException e2) {
                throw new JikkouRuntimeException(e2);
            }
        }

        private CompletableFuture<Map<String, Config>> describeConfigs(Collection<String> collection) {
            return CompletableFuture.supplyAsync(() -> {
                try {
                    ConfigsBuilder configsBuilder = new ConfigsBuilder();
                    collection.forEach(str -> {
                        configsBuilder.newResourceConfig().setType(ConfigResource.Type.BROKER).setName(str);
                    });
                    return (Map) ((Map) this.client.describeConfigs(configsBuilder.build().keySet()).all().get()).entrySet().stream().collect(Collectors.toMap(entry -> {
                        return ((ConfigResource) entry.getKey()).name();
                    }, (v0) -> {
                        return v0.getValue();
                    }));
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    throw new JikkouRuntimeException(e);
                } catch (ExecutionException e2) {
                    throw new JikkouRuntimeException(e2);
                }
            });
        }

        private CompletableFuture<Map<String, Node>> describeCluster() {
            return CompletableFuture.supplyAsync(() -> {
                try {
                    return (Map) ((Collection) this.client.describeCluster().nodes().get()).stream().collect(Collectors.toMap((v0) -> {
                        return v0.idString();
                    }, node -> {
                        return node;
                    }));
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    throw new JikkouRuntimeException(e);
                } catch (ExecutionException e2) {
                    throw new JikkouRuntimeException(e2);
                }
            });
        }
    }

    public AdminClientKafkaBrokerCollector() {
    }

    public AdminClientKafkaBrokerCollector(@NotNull AdminClientContextFactory adminClientContextFactory) {
        this.adminClientContextFactory = adminClientContextFactory;
    }

    public void configure(@NotNull Configuration configuration) throws ConfigException {
        LOG.info("Configuring");
        if (this.adminClientContextFactory == null) {
            this.adminClientContextFactory = new AdminClientContextFactory(configuration);
        }
    }

    public ResourceListObject<V1KafkaBroker> listAll(@NotNull Configuration configuration, @NotNull List<Selector> list) {
        LOG.info("Listing all kafka brokers");
        AdminClientContext createAdminClientContext = this.adminClientContextFactory.createAdminClientContext();
        try {
            ResourceListObject<V1KafkaBroker> listAll = listAll(configuration, list, createAdminClientContext);
            if (createAdminClientContext != null) {
                createAdminClientContext.close();
            }
            return listAll;
        } catch (Throwable th) {
            if (createAdminClientContext != null) {
                try {
                    createAdminClientContext.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @NotNull
    ResourceListObject<V1KafkaBroker> listAll(@NotNull Configuration configuration, @NotNull List<Selector> list, @NotNull AdminClientContext adminClientContext) {
        ConfigDescribeConfiguration configDescribeConfiguration = new ConfigDescribeConfiguration(configuration);
        List<V1KafkaBroker> listAll = new KafkaBrokerClient(adminClientContext.getAdminClient()).listAll(new KafkaConfigPredicate().withDefaultConfig(configDescribeConfiguration.isDescribeDefaultConfigs()).withDynamicBrokerConfig(configDescribeConfiguration.isDescribeDynamicBrokerConfigs()).withStaticBrokerConfig(configDescribeConfiguration.isDescribeStaticBrokerConfigs()));
        String clusterId = adminClientContext.getClusterId();
        Stream<V1KafkaBroker> stream = listAll.stream();
        AggregateSelector aggregateSelector = new AggregateSelector(list);
        return new V1KafkaBrokerList(stream.filter((v1) -> {
            return r1.apply(v1);
        }).map(v1KafkaBroker -> {
            return v1KafkaBroker.toBuilder().withMetadata(v1KafkaBroker.getMetadata().toBuilder().withAnnotation(MetadataAnnotations.JIKKOU_IO_KAFKA_CLUSTER_ID, clusterId).build()).build();
        }).toList());
    }

    private Collection<String> loadClusterBrokerIds(AdminClient adminClient) {
        return (Collection) KafkaUtils.listBrokers(adminClient).thenApply(collection -> {
            return (List) collection.stream().map((v0) -> {
                return v0.idString();
            }).collect(Collectors.toList());
        }).join();
    }
}
