package io.quarkus.kafka.client.runtime.ui;

import io.quarkus.kafka.client.runtime.KafkaAdminClient;
import io.quarkus.kafka.client.runtime.ui.model.Order;
import io.quarkus.kafka.client.runtime.ui.model.converter.KafkaModelConverter;
import io.quarkus.kafka.client.runtime.ui.model.request.KafkaMessageCreateRequest;
import io.quarkus.kafka.client.runtime.ui.model.response.KafkaMessagePage;
import io.quarkus.kafka.client.runtime.ui.util.ConsumerFactory;
import io.smallrye.common.annotation.Identifier;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.BytesSerializer;
import org.apache.kafka.common.utils.Bytes;

@ApplicationScoped
/* loaded from: input_file:io/quarkus/kafka/client/runtime/ui/KafkaTopicClient.class */
public class KafkaTopicClient {
    private static final int RETRIES = 3;

    @Inject
    KafkaAdminClient adminClient;
    KafkaModelConverter modelConverter = new KafkaModelConverter();

    @Inject
    @Identifier("default-kafka-broker")
    Map<String, Object> config;

    private Producer<Bytes, Bytes> createProducer() {
        HashMap hashMap = new HashMap(this.config);
        hashMap.put("client.id", "kafka-ui-producer-" + UUID.randomUUID());
        hashMap.put("key.serializer", BytesSerializer.class.getName());
        hashMap.put("value.serializer", BytesSerializer.class.getName());
        return new KafkaProducer(hashMap);
    }

    public KafkaMessagePage getTopicMessages(String str, Order order, Map<Integer, Long> map, int i) throws ExecutionException, InterruptedException {
        assertParamsValid(i, map);
        Set<Integer> keySet = map.keySet();
        assertRequestedPartitionsExist(str, keySet);
        if (order == null) {
            order = Order.OLD_FIRST;
        }
        List<ConsumerRecord<Bytes, Bytes>> consumerRecords = getConsumerRecords(str, order, i, keySet, map, i);
        Comparator<? super ConsumerRecord<Bytes, Bytes>> comparing = Comparator.comparing((v0) -> {
            return v0.timestamp();
        });
        if (Order.NEW_FIRST == order) {
            comparing = comparing.reversed();
        }
        consumerRecords.sort(comparing);
        if (consumerRecords.size() > i) {
            consumerRecords = consumerRecords.subList(0, i);
        }
        Map<Integer, Long> calculateNewPartitionOffset = calculateNewPartitionOffset(map, consumerRecords, order, str);
        Stream<ConsumerRecord<Bytes, Bytes>> stream = consumerRecords.stream();
        KafkaModelConverter kafkaModelConverter = this.modelConverter;
        Objects.requireNonNull(kafkaModelConverter);
        return new KafkaMessagePage(calculateNewPartitionOffset, (List) stream.map(kafkaModelConverter::convert).collect(Collectors.toList()));
    }

    private void assertParamsValid(int i, Map<Integer, Long> map) {
        if (i <= 0) {
            throw new IllegalArgumentException("Page size must be > 0.");
        }
        if (map == null || map.isEmpty()) {
            throw new IllegalArgumentException("Partition offset map must be specified.");
        }
        Iterator<Map.Entry<Integer, Long>> it = map.entrySet().iterator();
        while (it.hasNext()) {
            if (it.next().getValue().longValue() < 0) {
                throw new IllegalArgumentException("Partition offset must be > 0.");
            }
        }
    }

    private ConsumerRecords<Bytes, Bytes> pollWhenReady(Consumer<Bytes, Bytes> consumer) {
        Duration of = Duration.of(100L, ChronoUnit.MILLIS);
        ConsumerRecords<Bytes, Bytes> poll = consumer.poll(of);
        for (int i = 0; poll.isEmpty() && i < RETRIES; i++) {
            poll = consumer.poll(of);
        }
        return poll;
    }

    private Map<Integer, Long> calculateNewPartitionOffset(Map<Integer, Long> map, Collection<ConsumerRecord<Bytes, Bytes>> collection, Order order, String str) {
        Map map2 = (Map) collection.stream().map((v0) -> {
            return v0.partition();
        }).collect(Collectors.groupingBy(Function.identity(), Collectors.counting()));
        HashMap hashMap = new HashMap();
        for (Integer num : map.keySet()) {
            hashMap.put(num, Long.valueOf(map.get(num).longValue() + ((Order.OLD_FIRST == order ? 1 : -1) * ((Long) map2.getOrDefault(num, 0L)).longValue())));
        }
        return hashMap;
    }

    private long getPosition(String str, int i, Order order) {
        Consumer<Bytes, Bytes> createConsumer = ConsumerFactory.createConsumer(str, Integer.valueOf(i), this.config);
        try {
            TopicPartition topicPartition = new TopicPartition(str, i);
            if (Order.NEW_FIRST == order) {
                createConsumer.seekToEnd(List.of(topicPartition));
            } else {
                createConsumer.seekToBeginning(List.of(topicPartition));
            }
            long position = createConsumer.position(topicPartition);
            if (createConsumer != null) {
                createConsumer.close();
            }
            return position;
        } catch (Throwable th) {
            if (createConsumer != null) {
                try {
                    createConsumer.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    public Map<Integer, Long> getPagePartitionOffset(String str, Collection<Integer> collection, Order order) throws ExecutionException, InterruptedException {
        assertRequestedPartitionsExist(str, collection);
        HashMap hashMap = new HashMap();
        for (Integer num : collection) {
            hashMap.put(num, Long.valueOf(getPosition(str, num.intValue(), order)));
        }
        return hashMap;
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v41, types: [java.util.List] */
    private List<ConsumerRecord<Bytes, Bytes>> getConsumerRecords(String str, Order order, int i, Collection<Integer> collection, Map<Integer, Long> map, int i2) {
        ArrayList arrayList = new ArrayList();
        for (Integer num : collection) {
            ArrayList arrayList2 = new ArrayList();
            Long l = map.get(num);
            Consumer<Bytes, Bytes> createConsumer = ConsumerFactory.createConsumer(str, num, this.config);
            try {
                TopicPartition topicPartition = new TopicPartition(str, num.intValue());
                long longValue = Order.OLD_FIRST == order ? l.longValue() : Long.max(l.longValue() - i, 0L);
                createConsumer.seek(topicPartition, longValue);
                int i3 = 0;
                boolean z = true;
                while (z) {
                    ConsumerRecords<Bytes, Bytes> pollWhenReady = pollWhenReady(createConsumer);
                    if (pollWhenReady.isEmpty()) {
                        z = false;
                    }
                    Iterator it = pollWhenReady.iterator();
                    while (true) {
                        if (it.hasNext()) {
                            i3++;
                            arrayList2.add((ConsumerRecord) it.next());
                            if (i3 >= i2) {
                                z = false;
                                break;
                            }
                        }
                    }
                }
                if (Order.NEW_FIRST == order && longValue == 0 && arrayList2.size() > l.intValue()) {
                    arrayList2.sort(Comparator.comparing((v0) -> {
                        return v0.timestamp();
                    }));
                    arrayList2 = arrayList2.subList(0, l.intValue());
                }
                if (createConsumer != null) {
                    createConsumer.close();
                }
                arrayList.addAll(arrayList2);
            } catch (Throwable th) {
                if (createConsumer != null) {
                    try {
                        createConsumer.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        }
        return arrayList;
    }

    private void assertRequestedPartitionsExist(String str, Collection<Integer> collection) throws InterruptedException, ExecutionException {
        List<Integer> partitions = partitions(str);
        if (!new HashSet(partitions).containsAll(collection)) {
            throw new IllegalArgumentException(String.format("Requested messages from partition, that do not exist. Requested partitions: %s. Existing partitions: %s", collection, partitions));
        }
    }

    public void createMessage(KafkaMessageCreateRequest kafkaMessageCreateRequest) {
        ProducerRecord producerRecord = new ProducerRecord(kafkaMessageCreateRequest.getTopic(), kafkaMessageCreateRequest.getPartition(), Bytes.wrap(kafkaMessageCreateRequest.getKey().getBytes()), Bytes.wrap(kafkaMessageCreateRequest.getValue().getBytes()));
        Producer<Bytes, Bytes> createProducer = createProducer();
        try {
            createProducer.send(producerRecord);
            if (createProducer != null) {
                createProducer.close();
            }
        } catch (Throwable th) {
            if (createProducer != null) {
                try {
                    createProducer.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    public List<Integer> partitions(String str) throws ExecutionException, InterruptedException {
        return (List) this.adminClient.describeTopics(List.of(str)).values().stream().reduce((topicDescription, topicDescription2) -> {
            throw new IllegalStateException("Requested info about single topic, but got result of multiple: " + topicDescription + ", " + topicDescription2);
        }).orElseThrow(() -> {
            return new IllegalStateException("Requested info about a topic, but nothing found. Topic name: " + str);
        }).partitions().stream().map((v0) -> {
            return v0.partition();
        }).collect(Collectors.toList());
    }
}
