package io.streamthoughts.jikkou.kafka.reconcilier.service;

import io.streamthoughts.jikkou.common.utils.AsyncUtils;
import io.streamthoughts.jikkou.common.utils.Pair;
import io.streamthoughts.jikkou.core.exceptions.JikkouRuntimeException;
import io.streamthoughts.jikkou.core.models.ObjectMeta;
import io.streamthoughts.jikkou.kafka.MetadataAnnotations;
import io.streamthoughts.jikkou.kafka.collections.V1KafkaConsumerGroupList;
import io.streamthoughts.jikkou.kafka.internals.Futures;
import io.streamthoughts.jikkou.kafka.models.V1KafkaConsumerGroup;
import io.streamthoughts.jikkou.kafka.models.V1KafkaConsumerGroupMember;
import io.streamthoughts.jikkou.kafka.models.V1KafkaConsumerGroupSpec;
import io.streamthoughts.jikkou.kafka.models.V1KafkaConsumerOffset;
import io.streamthoughts.jikkou.kafka.models.V1KafkaNode;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.ConsumerGroupDescription;
import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsResult;
import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsSpec;
import org.apache.kafka.clients.admin.ListConsumerGroupsOptions;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.ConsumerGroupState;
import org.apache.kafka.common.TopicPartition;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/streamthoughts/jikkou/kafka/reconcilier/service/KafkaConsumerGroupService.class */
public final class KafkaConsumerGroupService {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaConsumerGroupService.class);
    private final AdminClient client;

    public KafkaConsumerGroupService(@NotNull AdminClient adminClient) {
        this.client = (AdminClient) Objects.requireNonNull(adminClient, "client cannot be null");
    }

    public V1KafkaConsumerGroupList listConsumerGroups(@NotNull Set<ConsumerGroupState> set, boolean z) {
        return getV1KafkaConsumerGroups(z, getConsumerGroupIds(set));
    }

    @NotNull
    private V1KafkaConsumerGroupList getV1KafkaConsumerGroups(boolean z, @NotNull List<String> list) {
        ListConsumerGroupOffsetsResult listConsumerGroupOffsetsResult;
        List list2 = this.client.describeConsumerGroups(list).describedGroups().entrySet().stream().map(Pair::of).map(pair -> {
            return pair.mapRight(Futures::toCompletableFuture);
        }).toList();
        if (z) {
            listConsumerGroupOffsetsResult = this.client.listConsumerGroupOffsets((Map) list.stream().collect(Collectors.toMap(Function.identity(), str -> {
                return new ListConsumerGroupOffsetsSpec();
            })));
        } else {
            listConsumerGroupOffsetsResult = null;
        }
        ListConsumerGroupOffsetsResult listConsumerGroupOffsetsResult2 = listConsumerGroupOffsetsResult;
        return new V1KafkaConsumerGroupList((List) AsyncUtils.getValueOrThrowException(AsyncUtils.waitForAll(list2.stream().map(pair2 -> {
            return mergeResults((String) pair2._1(), (CompletableFuture) pair2._2(), listConsumerGroupOffsetsResult2);
        }).toList()), th -> {
            LOG.error("Failed to describe consumer groups.", th);
            return new JikkouRuntimeException(String.format("Failed to describe consumer groups. Cause %s: %s.", th.getClass().getSimpleName(), th.getLocalizedMessage()));
        }));
    }

    @NotNull
    private List<String> getConsumerGroupIds(@NotNull Set<ConsumerGroupState> set) {
        return ((Collection) AsyncUtils.getValueOrThrowException(Futures.toCompletableFuture(this.client.listConsumerGroups(new ListConsumerGroupsOptions().inStates(set)).all()), th -> {
            LOG.error("Failed to list consumer groups.", th);
            return new JikkouRuntimeException(String.format("Failed to list consumer groups. Cause %s: %s.", th.getClass().getSimpleName(), th.getLocalizedMessage()));
        })).stream().map((v0) -> {
            return v0.groupId();
        }).toList();
    }

    private CompletableFuture<V1KafkaConsumerGroup> mergeResults(@NotNull String str, @NotNull CompletableFuture<ConsumerGroupDescription> completableFuture, @Nullable ListConsumerGroupOffsetsResult listConsumerGroupOffsetsResult) {
        return listConsumerGroupOffsetsResult == null ? completableFuture.thenApply(this::mapToResource) : completableFuture.thenCombine((CompletionStage) Futures.toCompletableFuture(listConsumerGroupOffsetsResult.partitionsToOffsetAndMetadata(str)), this::mapToResource);
    }

    public V1KafkaConsumerGroup mapToResource(@NotNull ConsumerGroupDescription consumerGroupDescription) {
        return mapToResource(consumerGroupDescription, null);
    }

    public V1KafkaConsumerGroup mapToResource(@NotNull ConsumerGroupDescription consumerGroupDescription, @Nullable Map<TopicPartition, OffsetAndMetadata> map) {
        return V1KafkaConsumerGroup.builder().withMetadata(ObjectMeta.builder().withName(consumerGroupDescription.groupId()).withLabel(MetadataAnnotations.JIKKOU_IO_KAFKA_IS_SIMPLE_CONSUMER, Boolean.valueOf(consumerGroupDescription.isSimpleConsumerGroup())).build()).withSpec(V1KafkaConsumerGroupSpec.builder().withState(consumerGroupDescription.state().name()).withCoordinator(V1KafkaNode.builder().withId(consumerGroupDescription.coordinator().idString()).withHost(consumerGroupDescription.coordinator().host()).withPort(Integer.valueOf(consumerGroupDescription.coordinator().port())).withRack(consumerGroupDescription.coordinator().rack()).build()).withMembers(consumerGroupDescription.members().stream().map(memberDescription -> {
            V1KafkaConsumerGroupMember.V1KafkaConsumerGroupMemberBuilder withMemberId = V1KafkaConsumerGroupMember.builder().withHost(memberDescription.host()).withClientId(memberDescription.clientId()).withMemberId(memberDescription.consumerId());
            Optional groupInstanceId = memberDescription.groupInstanceId();
            Objects.requireNonNull(withMemberId);
            V1KafkaConsumerGroupMember.V1KafkaConsumerGroupMemberBuilder withAssignments = ((V1KafkaConsumerGroupMember.V1KafkaConsumerGroupMemberBuilder) groupInstanceId.map(withMemberId::withGroupInstanceId).orElse(withMemberId)).withAssignments(memberDescription.assignment().topicPartitions().stream().map((v0) -> {
                return v0.toString();
            }).toList());
            if (map != null) {
                withAssignments = withAssignments.withOffsets(map.entrySet().stream().map(entry -> {
                    return new V1KafkaConsumerOffset(((TopicPartition) entry.getKey()).topic(), Integer.valueOf(((TopicPartition) entry.getKey()).partition()), Long.valueOf(((OffsetAndMetadata) entry.getValue()).offset()));
                }).toList());
            }
            return withAssignments.build();
        }).toList()).build()).build();
    }
}
