package io.streamthoughts.jikkou.kafka.reconciler.service;

import io.streamthoughts.jikkou.common.utils.AsyncUtils;
import io.streamthoughts.jikkou.core.exceptions.JikkouRuntimeException;
import io.streamthoughts.jikkou.core.models.ObjectMeta;
import io.streamthoughts.jikkou.kafka.KafkaLabelAndAnnotations;
import io.streamthoughts.jikkou.kafka.adapters.KafkaConfigsAdapter;
import io.streamthoughts.jikkou.kafka.internals.ConfigsBuilder;
import io.streamthoughts.jikkou.kafka.internals.Futures;
import io.streamthoughts.jikkou.kafka.models.KafkaTopicPartitionInfo;
import io.streamthoughts.jikkou.kafka.models.V1KafkaTopic;
import io.streamthoughts.jikkou.kafka.models.V1KafkaTopicSpec;
import io.streamthoughts.jikkou.kafka.models.V1KafkaTopicStatus;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.Config;
import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.clients.admin.ListOffsetsResult;
import org.apache.kafka.clients.admin.OffsetSpec;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.TopicPartitionInfo;
import org.apache.kafka.common.config.ConfigResource;
import org.jetbrains.annotations.NotNull;

/* loaded from: input_file:io/streamthoughts/jikkou/kafka/reconciler/service/KafkaTopicService.class */
public final class KafkaTopicService {
    public static final Set<String> NO_CONFIG_MAP_REFS = Collections.emptySet();
    private final AdminClient client;

    public KafkaTopicService(AdminClient adminClient) {
        this.client = (AdminClient) Objects.requireNonNull(adminClient, "client cannot be null");
    }

    public List<V1KafkaTopic> listAll(@NotNull Predicate<ConfigEntry> predicate, boolean z) {
        return listAll((Set) AsyncUtils.getValueOrThrowException(Futures.toCompletableFuture(this.client.listTopics().names()), th -> {
            return new JikkouRuntimeException("Failed to list kafka topics", th);
        }), predicate, z);
    }

    public List<V1KafkaTopic> listAll(@NotNull Set<String> set, @NotNull Predicate<ConfigEntry> predicate, boolean z) {
        return (List) AsyncUtils.getValueOrThrowException(getDescriptionForTopics(set).thenCombine((CompletionStage) getConfigForTopics(set), (map, map2) -> {
            return map.values().stream().map(topicDescription -> {
                return newTopicResources(topicDescription, (Config) map2.get(topicDescription.name()), predicate, z);
            }).toList();
        }), th -> {
            return new JikkouRuntimeException("Failed to retrieve kafka topic descriptions/or configurations.", th);
        });
    }

    private V1KafkaTopic newTopicResources(TopicDescription topicDescription, Config config, Predicate<ConfigEntry> predicate, boolean z) {
        V1KafkaTopic.V1KafkaTopicBuilder withSpec = V1KafkaTopic.builder().withMetadata(ObjectMeta.builder().withName(topicDescription.name()).withLabel(KafkaLabelAndAnnotations.JIKKOU_IO_KAFKA_TOPIC_ID, topicDescription.topicId().toString()).build()).withSpec(V1KafkaTopicSpec.builder().withPartitions(Integer.valueOf(topicDescription.partitions().size())).withReplicas(Short.valueOf((short) computeReplicationFactor(topicDescription))).withConfigs(KafkaConfigsAdapter.of(config, predicate)).withConfigMapRefs(NO_CONFIG_MAP_REFS).build());
        if (z) {
            withSpec = withSpec.withStatus(V1KafkaTopicStatus.builder().withPartitions(topicDescription.partitions().stream().map(topicPartitionInfo -> {
                return new KafkaTopicPartitionInfo(Integer.valueOf(topicPartitionInfo.partition()), Integer.valueOf(topicPartitionInfo.leader().id()), topicPartitionInfo.replicas().stream().map((v0) -> {
                    return v0.id();
                }).toList(), topicPartitionInfo.isr().stream().map((v0) -> {
                    return v0.id();
                }).toList());
            }).toList()).build());
        }
        return withSpec.build();
    }

    public Map<TopicPartition, Long> getLogEndOffsetForTopicPartition(Set<TopicPartition> set) {
        try {
            return (Map) this.client.listOffsets((Map) set.stream().collect(Collectors.toMap(Function.identity(), topicPartition -> {
                return OffsetSpec.latest();
            }))).all().get().entrySet().stream().collect(Collectors.toMap((v0) -> {
                return v0.getKey();
            }, entry -> {
                return Long.valueOf(((ListOffsetsResult.ListOffsetsResultInfo) entry.getValue()).offset());
            }));
        } catch (InterruptedException | ExecutionException e) {
            if (e instanceof InterruptedException) {
                Thread.currentThread().interrupt();
            }
            throw new JikkouRuntimeException("Failed to get log end-offset for topic partitions: " + String.valueOf(set), e);
        }
    }

    private int computeReplicationFactor(TopicDescription topicDescription) {
        Iterator<TopicPartitionInfo> it = topicDescription.partitions().iterator();
        int size = it.next().replicas().size();
        while (it.hasNext() && size != -1) {
            if (size != it.next().replicas().size()) {
                size = -1;
            }
        }
        return size;
    }

    private CompletableFuture<Map<String, Config>> getConfigForTopics(Collection<String> collection) {
        ConfigsBuilder configsBuilder = new ConfigsBuilder();
        collection.forEach(str -> {
            configsBuilder.newResourceConfig().setType(ConfigResource.Type.TOPIC).setName(str);
        });
        return Futures.toCompletableFuture(this.client.describeConfigs(configsBuilder.build().keySet()).all()).thenApply(map -> {
            return (Map) map.entrySet().stream().collect(Collectors.toMap(entry -> {
                return ((ConfigResource) entry.getKey()).name();
            }, (v0) -> {
                return v0.getValue();
            }));
        });
    }

    private CompletableFuture<Map<String, TopicDescription>> getDescriptionForTopics(Collection<String> collection) {
        return Futures.toCompletableFuture(this.client.describeTopics(collection).allTopicNames());
    }
}
