package pw.avvero.test.kafka;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.DescribeTopicsResult;
import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsResult;
import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsSpec;
import org.apache.kafka.clients.admin.ListOffsetsResult;
import org.apache.kafka.clients.admin.OffsetSpec;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.autoconfigure.kafka.KafkaConnectionDetails;
import org.springframework.context.ApplicationContext;
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.listener.MessageListenerContainer;

/* loaded from: input_file:pw/avvero/test/kafka/KafkaSupport.class */
public class KafkaSupport {
    private static final Logger log = LoggerFactory.getLogger(KafkaSupport.class);
    public static int OFFSET_COMMIT_WAIT_ATTEMPTS_MAX = 200;
    public static int OFFSET_COMMIT_WAIT_TIME = 10;
    static ThreadLocal<Long> topicsOffsetsTotalInThread = new ThreadLocal<>();

    public static void waitForPartitionAssignment(ApplicationContext applicationContext) throws Exception {
        detectMultipleContainersForSameTopicWithinSameGroup(applicationContext);
        KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry = (KafkaListenerEndpointRegistry) applicationContext.getBean(KafkaListenerEndpointRegistry.class);
        log.debug("[KTS] Waiting for partition assignment is requested");
        long currentTimeMillis = System.currentTimeMillis();
        for (MessageListenerContainer messageListenerContainer : kafkaListenerEndpointRegistry.getListenerContainers()) {
            long currentTimeMillis2 = System.currentTimeMillis();
            log.debug("[KTS] Waiting for partition assignment started for {}", messageListenerContainer.getListenerId());
            int waitForAssignment = ContainerTestUtils.waitForAssignment(messageListenerContainer, 1);
            long currentTimeMillis3 = System.currentTimeMillis() - currentTimeMillis2;
            if (waitForAssignment > 0) {
                log.debug("[KTS] Waiting for partition assignment for {} is succeeded in {} ms, topics: {}", new Object[]{messageListenerContainer.getListenerId(), Long.valueOf(currentTimeMillis3), (String) ((Collection) Objects.requireNonNull(messageListenerContainer.getAssignedPartitions())).stream().map((v0) -> {
                    return v0.topic();
                }).collect(Collectors.joining(", "))});
            } else {
                log.error("[KTS] Waiting for partition assignment for {} is failed in {} ms", messageListenerContainer.getListenerId(), Long.valueOf(currentTimeMillis3));
            }
        }
        log.debug("[KTS] Waiting for partition assignment is finished in {} ms. At least one partition is assigned for every container", Long.valueOf(System.currentTimeMillis() - currentTimeMillis));
    }

    private static void detectMultipleContainersForSameTopicWithinSameGroup(ApplicationContext applicationContext) {
        KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry = (KafkaListenerEndpointRegistry) applicationContext.getBean(KafkaListenerEndpointRegistry.class);
        HashMap hashMap = new HashMap();
        for (MessageListenerContainer messageListenerContainer : kafkaListenerEndpointRegistry.getListenerContainers()) {
            ContainerProperties containerProperties = messageListenerContainer.getContainerProperties();
            if (containerProperties.getTopics() != null) {
                for (String str : containerProperties.getTopics()) {
                    ((List) hashMap.computeIfAbsent(containerProperties.getGroupId() + " : " + str, str2 -> {
                        return new ArrayList();
                    })).add(messageListenerContainer);
                }
            }
        }
        hashMap.forEach((str3, list) -> {
            if (list.size() > 1) {
                String[] split = str3.split(" : ");
                throw new RuntimeException(String.format("Detected multiple Kafka listener containers (%s) configured to listen to topic '%s' within the same group '%s'. This configuration may lead to unexpected behavior or message duplication. Please ensure each topic is consumed by a unique group or container.", (String) list.stream().map((v0) -> {
                    return v0.getListenerId();
                }).collect(Collectors.joining(", ")), split[1], split[0]));
            }
        });
    }

    public static void waitForPartitionOffsetCommit(ApplicationContext applicationContext) throws ExecutionException, InterruptedException {
        waitForPartitionOffsetCommit((List<String>) ((KafkaConnectionDetails) applicationContext.getBean(KafkaConnectionDetails.class)).getBootstrapServers());
    }

    public static void waitForPartitionOffsetCommit(List<String> list) {
        AdminClient create = AdminClient.create(Collections.singletonMap("bootstrap.servers", list));
        try {
            try {
                waitForPartitionOffsetCommit(create);
                if (create != null) {
                    create.close();
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new RuntimeException(e);
            } catch (ExecutionException e2) {
                throw new RuntimeException(e2);
            }
        } catch (Throwable th) {
            if (create != null) {
                try {
                    create.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    public static void waitForPartitionOffsetCommit(AdminClient adminClient) throws ExecutionException, InterruptedException {
        waitForPartitionOffsetCommitForTopics(adminClient, ((Map) adminClient.listTopics().namesToListings().get()).keySet());
    }

    public static void waitForPartitionOffsetCommitForTopics(AdminClient adminClient, Set<String> set) throws ExecutionException, InterruptedException {
        waitForPartitionOffsetCommitForPartitions(adminClient, getPartitions(adminClient, set));
    }

    public static void waitForPartitionOffsetCommitForPartitions(AdminClient adminClient, Set<TopicPartition> set) throws ExecutionException, InterruptedException {
        waitForPartitionOffsetCommitForPartitions(adminClient, set, (Set) ((Collection) adminClient.listConsumerGroups().all().get()).stream().map((v0) -> {
            return v0.groupId();
        }).collect(Collectors.toSet()));
    }

    public static void waitForPartitionOffsetCommitForPartitions(AdminClient adminClient, Set<TopicPartition> set, Set<String> set2) throws InterruptedException, ExecutionException {
        long currentTimeMillis = System.currentTimeMillis();
        int i = 0;
        boolean z = false;
        while (!z) {
            i++;
            if (i > OFFSET_COMMIT_WAIT_ATTEMPTS_MAX) {
                throw new RuntimeException("Exceeded maximum attempts (" + OFFSET_COMMIT_WAIT_ATTEMPTS_MAX + ") waiting for offset commit for partitions.");
            }
            log.debug("[KTS] Waiting for offset commit is requested, attempt {}", Integer.valueOf(i));
            Map<TopicPartition, Long> offsetsForPartitions = getOffsetsForPartitions(adminClient, set);
            Long valueOf = Long.valueOf(offsetsForPartitions.values().stream().mapToLong((v0) -> {
                return v0.longValue();
            }).sum());
            if (valueOf.equals(topicsOffsetsTotalInThread.get())) {
                log.debug("[KTS] Topic offset is not changed; Waiting for offset commit is finished in {} ms", Long.valueOf(System.currentTimeMillis() - currentTimeMillis));
                return;
            }
            z = checkOffsetCommitted(set, set2, offsetsForPartitions, getOffsetsForConsumerGroups(adminClient, set2, (Set) offsetsForPartitions.entrySet().stream().filter(entry -> {
                return !((Long) entry.getValue()).equals(0L);
            }).map((v0) -> {
                return v0.getKey();
            }).collect(Collectors.toSet())));
            if (z) {
                z = valueOf.equals(Long.valueOf(getOffsetsForPartitions(adminClient, set).values().stream().mapToLong((v0) -> {
                    return v0.longValue();
                }).sum()));
            }
            if (z) {
                topicsOffsetsTotalInThread.set(valueOf);
            } else {
                log.warn("[KTS] Some offsets are not equal. Waiting for further message processing before proceeding. Refreshing end offsets and reevaluating.");
                try {
                    Thread.sleep(OFFSET_COMMIT_WAIT_TIME);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
        }
        log.debug("[KTS] Waiting for offset commit is finished in {} ms", Long.valueOf(System.currentTimeMillis() - currentTimeMillis));
    }

    private static boolean checkOffsetCommitted(Set<TopicPartition> set, Set<String> set2, Map<TopicPartition, Long> map, Map<String, Map<TopicPartition, Long>> map2) {
        boolean z = true;
        OffsetSnapshotFrame offsetSnapshotFrame = new OffsetSnapshotFrame();
        for (String str : set2) {
            for (TopicPartition topicPartition : set) {
                Long l = map2.getOrDefault(str, Map.of()).get(topicPartition);
                if (l != null) {
                    Long l2 = map.get(topicPartition);
                    z = z && (l2 == null || (l2.longValue() > 0L ? 1 : (l2.longValue() == 0L ? 0 : -1)) == 0 || l2.equals(l));
                    offsetSnapshotFrame.append(str, topicPartition, l, l2);
                }
            }
            offsetSnapshotFrame.split();
        }
        log.debug(offsetSnapshotFrame.toString());
        return z;
    }

    public static Set<TopicPartition> getPartitions(AdminClient adminClient, Set<String> set) throws ExecutionException, InterruptedException {
        HashSet hashSet = new HashSet();
        DescribeTopicsResult describeTopics = adminClient.describeTopics(set);
        for (String str : set) {
            int size = ((TopicDescription) ((KafkaFuture) describeTopics.topicNameValues().get(str)).get()).partitions().size();
            for (int i = 0; i < size; i++) {
                hashSet.add(new TopicPartition(str, i));
            }
        }
        return hashSet;
    }

    public static Map<TopicPartition, Long> getOffsetsForPartitions(AdminClient adminClient, Set<TopicPartition> set) throws ExecutionException, InterruptedException {
        return (Map) ((Map) adminClient.listOffsets((Map) set.stream().collect(Collectors.toMap(topicPartition -> {
            return topicPartition;
        }, topicPartition2 -> {
            return OffsetSpec.latest();
        }))).all().get()).entrySet().stream().collect(Collectors.toMap((v0) -> {
            return v0.getKey();
        }, entry -> {
            return Long.valueOf(((ListOffsetsResult.ListOffsetsResultInfo) entry.getValue()).offset());
        }));
    }

    public static Map<String, Map<TopicPartition, Long>> getOffsetsForConsumerGroups(AdminClient adminClient, Set<String> set, Set<TopicPartition> set2) throws ExecutionException, InterruptedException {
        HashMap hashMap = new HashMap();
        for (String str : set) {
            ListConsumerGroupOffsetsSpec listConsumerGroupOffsetsSpec = new ListConsumerGroupOffsetsSpec();
            listConsumerGroupOffsetsSpec.topicPartitions(set2);
            hashMap.put(str, listConsumerGroupOffsetsSpec);
        }
        ListConsumerGroupOffsetsResult listConsumerGroupOffsets = adminClient.listConsumerGroupOffsets(hashMap);
        HashMap hashMap2 = new HashMap();
        for (String str2 : set) {
            ((Map) listConsumerGroupOffsets.partitionsToOffsetAndMetadata(str2).get()).forEach((topicPartition, offsetAndMetadata) -> {
                if (offsetAndMetadata == null) {
                    return;
                }
                ((Map) hashMap2.computeIfAbsent(str2, str3 -> {
                    return new HashMap();
                })).put(topicPartition, Long.valueOf(offsetAndMetadata.offset()));
            });
        }
        return hashMap2;
    }
}
