package in.rcard.kafkaesque.consumer;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Objects;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.Deserializer;
import org.awaitility.Awaitility;
import org.awaitility.core.ConditionTimeoutException;

/* loaded from: input_file:in/rcard/kafkaesque/consumer/KafkaesqueConsumer.class */
public class KafkaesqueConsumer<Key, Value> {
    private final KafkaConsumer<Key, Value> kafkaConsumer;
    private final long interval;
    private final TimeUnit timeUnit;
    private final int emptyPollsCount;
    private final long emptyPollsInterval;
    private final TimeUnit emptyPollsTimeUnit;
    private final DelegateCreationInfo<Key, Value> creationInfo;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:in/rcard/kafkaesque/consumer/KafkaesqueConsumer$DelegateCreationInfo.class */
    public static class DelegateCreationInfo<Key, Value> {
        private final String topic;
        private final Deserializer<Key> keyDeserializer;
        private final Deserializer<Value> valueDeserializer;

        /* JADX INFO: Access modifiers changed from: package-private */
        public DelegateCreationInfo(String str, Deserializer<Key> deserializer, Deserializer<Value> deserializer2) {
            this.topic = str;
            this.keyDeserializer = deserializer;
            this.valueDeserializer = deserializer2;
        }

        public String getTopic() {
            return this.topic;
        }

        public Deserializer<Key> getKeyDeserializer() {
            return this.keyDeserializer;
        }

        public Deserializer<Value> getValueDeserializer() {
            return this.valueDeserializer;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public KafkaesqueConsumer(String str, long j, TimeUnit timeUnit, int i, long j2, TimeUnit timeUnit2, DelegateCreationInfo<Key, Value> delegateCreationInfo) {
        this.interval = j;
        this.timeUnit = timeUnit;
        this.emptyPollsCount = i;
        this.emptyPollsInterval = j2;
        this.emptyPollsTimeUnit = timeUnit2;
        this.creationInfo = delegateCreationInfo;
        this.kafkaConsumer = createKafkaConsumer(str);
    }

    private KafkaConsumer<Key, Value> createKafkaConsumer(String str) {
        Properties properties = new Properties();
        properties.put("group.id", "kafkaesque-consumer");
        properties.put("auto.offset.reset", "earliest");
        properties.put("bootstrap.servers", str);
        properties.put("enable.auto.commit", "true");
        properties.put("key.deserializer", this.creationInfo.getKeyDeserializer().getClass());
        properties.put("value.deserializer", this.creationInfo.getValueDeserializer().getClass());
        KafkaConsumer<Key, Value> kafkaConsumer = new KafkaConsumer<>(properties);
        subscribeConsumerToTopic(kafkaConsumer, this.creationInfo.getTopic());
        return kafkaConsumer;
    }

    private void subscribeConsumerToTopic(KafkaConsumer<Key, Value> kafkaConsumer, String str) {
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        kafkaConsumer.subscribe(List.of(str), new ConsumerRebalanceListener() { // from class: in.rcard.kafkaesque.consumer.KafkaesqueConsumer.1
            public void onPartitionsRevoked(Collection<TopicPartition> collection) {
            }

            public void onPartitionsAssigned(Collection<TopicPartition> collection) {
                countDownLatch.countDown();
            }
        });
        Awaitility.await().atMost(1L, TimeUnit.MINUTES).until(() -> {
            try {
                kafkaConsumer.poll(Duration.ofMillis(100L));
            } catch (Exception e) {
            }
            boolean z = countDownLatch.getCount() == 0;
            if (z) {
                kafkaConsumer.seekToBeginning(kafkaConsumer.assignment());
            }
            return Boolean.valueOf(z);
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public AssertionsOnConsumedDelegate<Key, Value> poll() {
        ArrayList arrayList = new ArrayList();
        try {
            AtomicInteger atomicInteger = new AtomicInteger(this.emptyPollsCount);
            Awaitility.await().atMost(this.interval, this.timeUnit).pollInterval(this.emptyPollsInterval, this.emptyPollsTimeUnit).until(() -> {
                if (isEmptyPollAfterSomeMessagesWereRead(arrayList)) {
                    return Boolean.valueOf(atomicInteger.decrementAndGet() == 0);
                }
                return false;
            });
            return new AssertionsOnConsumedDelegate<>(new AssertionsOnConsumed(arrayList), this);
        } catch (Exception e) {
            throw new KafkaesqueConsumerPollException("Error during the poll operation", e);
        } catch (ConditionTimeoutException e2) {
            if (arrayList.isEmpty()) {
                throw new AssertionError(String.format("The consumer cannot find any message during the given time interval: %d %s", Long.valueOf(this.interval), this.timeUnit.toString()));
            }
            throw new AssertionError(String.format("The consumer reads new messages until the end of the given time interval: %d %s", Long.valueOf(this.interval), this.timeUnit.toString()));
        }
    }

    private boolean isEmptyPollAfterSomeMessagesWereRead(List<ConsumerRecord<Key, Value>> list) {
        return readNewMessages(list) == 0 && !list.isEmpty();
    }

    private int readNewMessages(List<ConsumerRecord<Key, Value>> list) {
        ConsumerRecords poll = this.kafkaConsumer.poll(Duration.ofMillis(50L));
        ArrayList arrayList = new ArrayList();
        Iterable records = poll.records(this.creationInfo.getTopic());
        Objects.requireNonNull(arrayList);
        records.forEach((v1) -> {
            r1.add(v1);
        });
        if (!arrayList.isEmpty()) {
            list.addAll(arrayList);
        }
        return arrayList.size();
    }

    public void close() {
        this.kafkaConsumer.close();
    }
}
