package io.streamthoughts.jikkou.kafka.control;

import io.streamthoughts.jikkou.api.AcceptResource;
import io.streamthoughts.jikkou.api.ResourceFilter;
import io.streamthoughts.jikkou.api.config.Configuration;
import io.streamthoughts.jikkou.api.control.ResourceDescriptor;
import io.streamthoughts.jikkou.api.error.JikkouException;
import io.streamthoughts.jikkou.api.model.ObjectMeta;
import io.streamthoughts.jikkou.kafka.AdminClientContext;
import io.streamthoughts.jikkou.kafka.adapters.KafkaConfigsAdapter;
import io.streamthoughts.jikkou.kafka.internals.ConfigsBuilder;
import io.streamthoughts.jikkou.kafka.internals.KafkaUtils;
import io.streamthoughts.jikkou.kafka.models.V1KafkaBrokerList;
import io.streamthoughts.jikkou.kafka.models.V1KafkaBrokerObject;
import io.streamthoughts.jikkou.kafka.models.V1KafkaBrokersSpec;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.Config;
import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.config.ConfigResource;
import org.jetbrains.annotations.NotNull;

@AcceptResource(type = V1KafkaBrokerList.class)
/* loaded from: input_file:io/streamthoughts/jikkou/kafka/control/AdminClientKafkaBrokerDescriptor.class */
public final class AdminClientKafkaBrokerDescriptor extends AdminClientKafkaController implements ResourceDescriptor<V1KafkaBrokerList> {

    /* loaded from: input_file:io/streamthoughts/jikkou/kafka/control/AdminClientKafkaBrokerDescriptor$DescribeBrokers.class */
    public final class DescribeBrokers {
        private final AdminClient client;
        private final Predicate<ConfigEntry> configEntryPredicate;

        public DescribeBrokers(AdminClient adminClient, Predicate<ConfigEntry> predicate) {
            this.client = adminClient;
            this.configEntryPredicate = predicate;
        }

        public List<V1KafkaBrokerObject> describe() {
            Collection<String> loadClusterBrokerIds = AdminClientKafkaBrokerDescriptor.this.loadClusterBrokerIds(this.client);
            try {
                return (List) describeCluster().thenCombine((CompletionStage) describeConfigs(loadClusterBrokerIds), (map, map2) -> {
                    return (List) map.values().stream().map(node -> {
                        return new V1KafkaBrokerObject(node.idString(), node.host(), Integer.valueOf(node.port()), node.rack(), KafkaConfigsAdapter.of((Config) map2.get(node.idString()), this.configEntryPredicate));
                    }).collect(Collectors.toList());
                }).get();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new JikkouException(e);
            } catch (ExecutionException e2) {
                throw new JikkouException(e2);
            }
        }

        private CompletableFuture<Map<String, Config>> describeConfigs(Collection<String> collection) {
            return CompletableFuture.supplyAsync(() -> {
                try {
                    ConfigsBuilder configsBuilder = new ConfigsBuilder();
                    collection.forEach(str -> {
                        configsBuilder.newResourceConfig().setType(ConfigResource.Type.BROKER).setName(str);
                    });
                    return (Map) ((Map) this.client.describeConfigs(configsBuilder.build().keySet()).all().get()).entrySet().stream().collect(Collectors.toMap(entry -> {
                        return ((ConfigResource) entry.getKey()).name();
                    }, (v0) -> {
                        return v0.getValue();
                    }));
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    throw new JikkouException(e);
                } catch (ExecutionException e2) {
                    throw new JikkouException(e2);
                }
            });
        }

        private CompletableFuture<Map<String, Node>> describeCluster() {
            return CompletableFuture.supplyAsync(() -> {
                try {
                    return (Map) ((Collection) this.client.describeCluster().nodes().get()).stream().collect(Collectors.toMap((v0) -> {
                        return v0.idString();
                    }, node -> {
                        return node;
                    }));
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    throw new JikkouException(e);
                } catch (ExecutionException e2) {
                    throw new JikkouException(e2);
                }
            });
        }
    }

    public AdminClientKafkaBrokerDescriptor() {
    }

    public AdminClientKafkaBrokerDescriptor(@NotNull Configuration configuration) {
        super(configuration);
    }

    public AdminClientKafkaBrokerDescriptor(@NotNull AdminClientContext adminClientContext) {
        super(adminClientContext);
    }

    /* renamed from: describe, reason: merged with bridge method [inline-methods] */
    public V1KafkaBrokerList m6describe(@NotNull Configuration configuration, @NotNull ResourceFilter resourceFilter) {
        ConfigDescribeConfiguration configDescribeConfiguration = new ConfigDescribeConfiguration(configuration);
        return new V1KafkaBrokerList().toBuilder().withMetadata(ObjectMeta.builder().withAnnotation("jikkou.io/kafka-cluster-id", this.adminClientContext.getClusterId()).build()).withSpec(V1KafkaBrokersSpec.builder().withBrokers((List) this.adminClientContext.invoke(adminClient -> {
            return new DescribeBrokers(adminClient, configDescribeConfiguration.configEntryPredicate()).describe();
        })).build()).build();
    }

    private Collection<String> loadClusterBrokerIds(AdminClient adminClient) {
        return (Collection) KafkaUtils.listBrokers(adminClient).thenApply(collection -> {
            return (List) collection.stream().map((v0) -> {
                return v0.idString();
            }).collect(Collectors.toList());
        }).join();
    }
}
