package io.streamthoughts.jikkou.kafka.reconciler.service;

import io.streamthoughts.jikkou.common.utils.AsyncUtils;
import io.streamthoughts.jikkou.common.utils.Strings;
import io.streamthoughts.jikkou.core.exceptions.JikkouRuntimeException;
import io.streamthoughts.jikkou.core.models.ObjectMeta;
import io.streamthoughts.jikkou.kafka.KafkaLabelAndAnnotations;
import io.streamthoughts.jikkou.kafka.collections.V1KafkaConsumerGroupList;
import io.streamthoughts.jikkou.kafka.internals.Futures;
import io.streamthoughts.jikkou.kafka.models.V1KafkaConsumerGroup;
import io.streamthoughts.jikkou.kafka.models.V1KafkaConsumerGroupMember;
import io.streamthoughts.jikkou.kafka.models.V1KafkaConsumerGroupStatus;
import io.streamthoughts.jikkou.kafka.models.V1KafkaConsumerOffset;
import io.streamthoughts.jikkou.kafka.models.V1KafkaNode;
import io.streamthoughts.jikkou.kafka.reconciler.service.KafkaOffsetSpec;
import java.lang.invoke.MethodHandles;
import java.lang.invoke.MethodType;
import java.lang.runtime.SwitchBootstraps;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.ConsumerGroupDescription;
import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsResult;
import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsSpec;
import org.apache.kafka.clients.admin.ListConsumerGroupsOptions;
import org.apache.kafka.clients.admin.ListOffsetsResult;
import org.apache.kafka.clients.admin.OffsetSpec;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.ConsumerGroupState;
import org.apache.kafka.common.TopicPartition;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;

/* loaded from: input_file:io/streamthoughts/jikkou/kafka/reconciler/service/KafkaConsumerGroupService.class */
public final class KafkaConsumerGroupService {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaConsumerGroupService.class);
    private final AdminClient client;

    public KafkaConsumerGroupService(@NotNull AdminClient adminClient) {
        this.client = (AdminClient) Objects.requireNonNull(adminClient, "client cannot be null");
    }

    public V1KafkaConsumerGroup resetConsumerGroupOffsets(@NotNull String str, @NotNull List<String> list, @NotNull KafkaOffsetSpec kafkaOffsetSpec, boolean z) {
        switch ((int) SwitchBootstraps.typeSwitch(MethodHandles.lookup(), "typeSwitch", MethodType.methodType(Integer.TYPE, Object.class, Integer.TYPE), KafkaOffsetSpec.ToEarliest.class, KafkaOffsetSpec.ToLatest.class, KafkaOffsetSpec.ToTimestamp.class, KafkaOffsetSpec.ToOffset.class).dynamicInvoker().invoke(kafkaOffsetSpec, 0) /* invoke-custom */) {
            case -1:
                throw new IllegalArgumentException("offsetSpec cannot be null");
            case 0:
                return resetConsumerGroupOffsets(str, list, OffsetSpec.earliest(), z);
            case 1:
                return resetConsumerGroupOffsets(str, list, OffsetSpec.latest(), z);
            case 2:
                return resetConsumerGroupOffsets(str, list, OffsetSpec.forTimestamp(((KafkaOffsetSpec.ToTimestamp) kafkaOffsetSpec).timestamp().longValue()), z);
            case 3:
                KafkaOffsetSpec.ToOffset toOffset = (KafkaOffsetSpec.ToOffset) kafkaOffsetSpec;
                return alterConsumerGroupOffsets(str, (Map) ((List) AsyncUtils.getValueOrThrowException(listTopicPartitions(list), JikkouRuntimeException::new)).stream().collect(Collectors.toMap(Function.identity(), topicPartition -> {
                    return new OffsetAndMetadata(toOffset.offset().longValue());
                })), z);
            default:
                throw new MatchException((String) null, (Throwable) null);
        }
    }

    public V1KafkaConsumerGroup resetConsumerGroupOffsets(@NotNull String str, @NotNull List<String> list, @NotNull OffsetSpec offsetSpec, boolean z) {
        if (Strings.isBlank(str)) {
            throw new IllegalArgumentException("groupId cannot be null");
        }
        if (list == null) {
            throw new IllegalArgumentException("topics cannot be null");
        }
        return alterConsumerGroupOffsets(str, (Map) ((Map) AsyncUtils.getValueOrThrowException(listOffsets(list, offsetSpec), JikkouRuntimeException::new)).entrySet().stream().collect(Collectors.toMap((v0) -> {
            return v0.getKey();
        }, entry -> {
            return new OffsetAndMetadata(((ListOffsetsResult.ListOffsetsResultInfo) entry.getValue()).offset());
        })), z);
    }

    private V1KafkaConsumerGroup alterConsumerGroupOffsets(@NotNull String str, @NotNull Map<TopicPartition, OffsetAndMetadata> map, boolean z) {
        if (LOG.isInfoEnabled()) {
            LOG.info("Altering offsets for consumer group '{}': {} (DRY_RUN: {}).", new Object[]{str, map, Boolean.valueOf(z)});
        }
        if (!z) {
            AsyncUtils.getValueOrThrowException(this.client.alterConsumerGroupOffsets(str, map).all().toCompletionStage().toCompletableFuture(), JikkouRuntimeException::new);
        }
        V1KafkaConsumerGroup v1KafkaConsumerGroup = (V1KafkaConsumerGroup) listConsumerGroups(List.of(str), true).first();
        if (z) {
            V1KafkaConsumerGroupStatus status = v1KafkaConsumerGroup.getStatus();
            Map map2 = (Map) status.getOffsets().stream().collect(Collectors.toMap(v1KafkaConsumerOffset -> {
                return new TopicPartition(v1KafkaConsumerOffset.getTopic(), v1KafkaConsumerOffset.getPartition().intValue());
            }, v1KafkaConsumerOffset2 -> {
                return v1KafkaConsumerOffset2;
            }));
            HashMap hashMap = new HashMap((Map) map.entrySet().stream().collect(Collectors.toMap((v0) -> {
                return v0.getKey();
            }, entry -> {
                return V1KafkaConsumerOffset.builder().withTopic(((TopicPartition) entry.getKey()).topic()).withPartition(Integer.valueOf(((TopicPartition) entry.getKey()).partition())).withOffset(Long.valueOf(((OffsetAndMetadata) entry.getValue()).offset())).build();
            })));
            map2.forEach((topicPartition, v1KafkaConsumerOffset3) -> {
                if (hashMap.containsKey(topicPartition)) {
                    return;
                }
                hashMap.put(topicPartition, v1KafkaConsumerOffset3);
            });
            v1KafkaConsumerGroup = v1KafkaConsumerGroup.withStatus(status.withOffsets(new ArrayList(hashMap.values())));
        }
        return v1KafkaConsumerGroup;
    }

    CompletableFuture<Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo>> listOffsets(@NotNull List<String> list, @NotNull OffsetSpec offsetSpec) {
        return listTopicPartitions(list).thenCompose(list2 -> {
            return this.client.listOffsets((Map) list2.stream().collect(Collectors.toMap(Function.identity(), topicPartition -> {
                return offsetSpec;
            }))).all().toCompletionStage();
        });
    }

    CompletableFuture<List<TopicPartition>> listTopicPartitions(@NotNull List<String> list) {
        return this.client.describeTopics(list).allTopicNames().toCompletionStage().thenApply(map -> {
            return map.values().stream().flatMap(topicDescription -> {
                return topicDescription.partitions().stream().map(topicPartitionInfo -> {
                    return new TopicPartition(topicDescription.name(), topicPartitionInfo.partition());
                });
            }).toList();
        }).toCompletableFuture();
    }

    @NotNull
    public V1KafkaConsumerGroupList listConsumerGroups(@NotNull Set<ConsumerGroupState> set, boolean z) {
        return listConsumerGroups(getConsumerGroupIds(set), z);
    }

    @NotNull
    public V1KafkaConsumerGroupList listConsumerGroups(@NotNull List<String> list, boolean z) {
        try {
            ListConsumerGroupOffsetsResult listConsumerGroupOffsets = z ? this.client.listConsumerGroupOffsets((Map) list.stream().collect(Collectors.toMap(Function.identity(), str -> {
                return new ListConsumerGroupOffsetsSpec();
            }))) : null;
            return new V1KafkaConsumerGroupList((List) Flux.fromStream(this.client.describeConsumerGroups(list).describedGroups().values().stream().map(kafkaFuture -> {
                return Futures.toCompletableFuture(kafkaFuture).thenApply(this::mapToResource);
            })).publishOn(Schedulers.boundedElastic()).flatMap(Mono::fromFuture).map(v1KafkaConsumerGroup -> {
                String name = v1KafkaConsumerGroup.getMetadata().getName();
                if (listConsumerGroupOffsets == null) {
                    return v1KafkaConsumerGroup;
                }
                return v1KafkaConsumerGroup.withStatus(v1KafkaConsumerGroup.getStatus().withOffsets(mapToResources((Map) AsyncUtils.getValueOrThrowException(Futures.toCompletableFuture(listConsumerGroupOffsets.partitionsToOffsetAndMetadata(name)), JikkouRuntimeException::new))));
            }).collectList().block());
        } catch (Exception e) {
            LOG.error("Failed to describe consumer groups.", e);
            throw new JikkouRuntimeException(String.format("Failed to describe consumer groups. Cause %s: %s.", e.getClass().getSimpleName(), e.getLocalizedMessage()));
        }
    }

    @NotNull
    private List<String> getConsumerGroupIds(@NotNull Set<ConsumerGroupState> set) {
        return ((Collection) AsyncUtils.getValueOrThrowException(Futures.toCompletableFuture(this.client.listConsumerGroups(new ListConsumerGroupsOptions().inStates(set)).all()), th -> {
            LOG.error("Failed to list consumer groups.", th);
            return new JikkouRuntimeException(String.format("Failed to list consumer groups. Cause %s: %s.", th.getClass().getSimpleName(), th.getLocalizedMessage()));
        })).stream().map((v0) -> {
            return v0.groupId();
        }).toList();
    }

    public V1KafkaConsumerGroup mapToResource(@NotNull ConsumerGroupDescription consumerGroupDescription) {
        return V1KafkaConsumerGroup.builder().withMetadata(ObjectMeta.builder().withName(consumerGroupDescription.groupId()).withLabel(KafkaLabelAndAnnotations.JIKKOU_IO_KAFKA_IS_SIMPLE_CONSUMER, Boolean.valueOf(consumerGroupDescription.isSimpleConsumerGroup())).build()).withStatus(V1KafkaConsumerGroupStatus.builder().withState(consumerGroupDescription.state().name()).withCoordinator(V1KafkaNode.builder().withId(consumerGroupDescription.coordinator().idString()).withHost(consumerGroupDescription.coordinator().host()).withPort(Integer.valueOf(consumerGroupDescription.coordinator().port())).withRack(consumerGroupDescription.coordinator().rack()).build()).withMembers(consumerGroupDescription.members().stream().map(memberDescription -> {
            V1KafkaConsumerGroupMember.V1KafkaConsumerGroupMemberBuilder withMemberId = V1KafkaConsumerGroupMember.builder().withHost(memberDescription.host()).withClientId(memberDescription.clientId()).withMemberId(memberDescription.consumerId());
            Optional groupInstanceId = memberDescription.groupInstanceId();
            Objects.requireNonNull(withMemberId);
            return ((V1KafkaConsumerGroupMember.V1KafkaConsumerGroupMemberBuilder) groupInstanceId.map(withMemberId::withGroupInstanceId).orElse(withMemberId)).withAssignments(memberDescription.assignment().topicPartitions().stream().map((v0) -> {
                return v0.toString();
            }).toList()).build();
        }).toList()).build()).build();
    }

    private List<V1KafkaConsumerOffset> mapToResources(Map<TopicPartition, OffsetAndMetadata> map) {
        Map<TopicPartition, Long> logEndOffsetForTopicPartition = new KafkaTopicService(this.client).getLogEndOffsetForTopicPartition(map.keySet());
        return map.entrySet().stream().map(entry -> {
            TopicPartition topicPartition = (TopicPartition) entry.getKey();
            long offset = ((OffsetAndMetadata) entry.getValue()).offset();
            return new V1KafkaConsumerOffset(topicPartition.topic(), Integer.valueOf(topicPartition.partition()), Long.valueOf(offset), Long.valueOf(((Long) Optional.ofNullable((Long) logEndOffsetForTopicPartition.get(topicPartition)).map(l -> {
                return Long.valueOf(l.longValue() - offset);
            }).orElse(-1L)).longValue()));
        }).toList();
    }
}
