package io.streamthoughts.jikkou.kafka.internals.consumer;

import io.streamthoughts.jikkou.kafka.internals.KafkaRecord;
import java.time.Duration;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.internals.NetworkClientDelegate;
import org.apache.kafka.common.TopicPartition;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/streamthoughts/jikkou/kafka/internals/consumer/KafkaLogToEndConsumer.class */
public class KafkaLogToEndConsumer<K, V> {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) KafkaLogToEndConsumer.class);
    private final ConsumerFactory<K, V> consumerFactory;

    public KafkaLogToEndConsumer(@NotNull ConsumerFactory<K, V> consumerFactory) {
        this.consumerFactory = (ConsumerFactory) Objects.requireNonNull(consumerFactory, "consumer must not be null");
    }

    public void readTopicToEnd(@NotNull String str, @NotNull ConsumerRecordCallback<K, V> consumerRecordCallback) {
        Consumer<K, V> createConsumer = this.consumerFactory.createConsumer();
        try {
            List list = createConsumer.partitionsFor(str).stream().map(partitionInfo -> {
                return new TopicPartition(partitionInfo.topic(), partitionInfo.partition());
            }).toList();
            createConsumer.assign(list);
            createConsumer.seekToBeginning(list);
            readToPartitionEnd(createConsumer, consumerRecordCallback);
            if (createConsumer != null) {
                createConsumer.close();
            }
        } catch (Throwable th) {
            if (createConsumer != null) {
                try {
                    createConsumer.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    private void readToPartitionEnd(Consumer<K, V> consumer, ConsumerRecordCallback<K, V> consumerRecordCallback) {
        Map<TopicPartition, Long> endOffsets = consumer.endOffsets(consumer.assignment());
        LOG.info("Reading to end of partitions offsets {}", endOffsets);
        while (!endOffsets.isEmpty()) {
            Iterator<Map.Entry<TopicPartition, Long>> it = endOffsets.entrySet().iterator();
            while (true) {
                if (it.hasNext()) {
                    Map.Entry<TopicPartition, Long> next = it.next();
                    TopicPartition key = next.getKey();
                    if (consumer.position(key) < next.getValue().longValue()) {
                        pollOnce(consumer, consumerRecordCallback);
                        break;
                    } else {
                        LOG.info("Finished read to end partition for {}-{}", key.topic(), Integer.valueOf(key.partition()));
                        it.remove();
                    }
                }
            }
        }
    }

    private void pollOnce(Consumer<K, V> consumer, ConsumerRecordCallback<K, V> consumerRecordCallback) {
        Iterator<ConsumerRecord<K, V>> it = consumer.poll(Duration.ofMillis(NetworkClientDelegate.PollResult.WAIT_FOREVER)).iterator();
        while (it.hasNext()) {
            consumerRecordCallback.accept(KafkaRecord.of(it.next()));
        }
    }
}
