package org.axonframework.kafka.eventhandling.consumer;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.function.BiFunction;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.axonframework.common.Assert;
import org.axonframework.kafka.eventhandling.KafkaMessageConverter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/axonframework/kafka/eventhandling/consumer/FetchEventsTask.class */
class FetchEventsTask<K, V> implements Runnable {
    private static final Logger logger = LoggerFactory.getLogger(FetchEventsTask.class);
    private final Consumer<K, V> consumer;
    private final Buffer<KafkaEventMessage> buffer;
    private final KafkaMessageConverter<K, V> converter;
    private final BiFunction<ConsumerRecord<K, V>, KafkaTrackingToken, Void> callback;
    private final long timeout;
    private KafkaTrackingToken currentToken;

    /* loaded from: input_file:org/axonframework/kafka/eventhandling/consumer/FetchEventsTask$Builder.class */
    public static class Builder<K, V> {
        private final long timeout;
        private final Consumer<K, V> consumer;
        private final Buffer<KafkaEventMessage> buffer;
        private final KafkaMessageConverter<K, V> converter;
        private final BiFunction<ConsumerRecord<K, V>, KafkaTrackingToken, Void> callback;
        private final KafkaTrackingToken currentToken;

        public Builder(Consumer<K, V> consumer, KafkaTrackingToken kafkaTrackingToken, Buffer<KafkaEventMessage> buffer, KafkaMessageConverter<K, V> kafkaMessageConverter, BiFunction<ConsumerRecord<K, V>, KafkaTrackingToken, Void> biFunction, long j) {
            Assert.notNull(consumer, () -> {
                return "Consumer may not be null";
            });
            Assert.notNull(buffer, () -> {
                return "Buffer may not be null";
            });
            Assert.notNull(kafkaMessageConverter, () -> {
                return "Converter may not be null";
            });
            Assert.notNull(kafkaTrackingToken, () -> {
                return "Token may not be null";
            });
            Assert.notNull(biFunction, () -> {
                return "Callback may not be null";
            });
            Assert.isFalse(j < 0, () -> {
                return "Timeout may not be < 0";
            });
            this.consumer = consumer;
            this.currentToken = kafkaTrackingToken;
            this.buffer = buffer;
            this.converter = kafkaMessageConverter;
            this.callback = biFunction;
            this.timeout = j;
        }

        public String toString() {
            return "Config{consumer=" + this.consumer + ", buffer=" + this.buffer + ", converter=" + this.converter + ", callback=" + this.callback + ", currentToken=" + this.currentToken + '}';
        }

        public FetchEventsTask<K, V> build() {
            return new FetchEventsTask<>(this);
        }
    }

    /* loaded from: input_file:org/axonframework/kafka/eventhandling/consumer/FetchEventsTask$CallbackEntry.class */
    private static class CallbackEntry<K, V> {
        private final KafkaTrackingToken token;
        private final ConsumerRecord<K, V> record;

        public CallbackEntry(KafkaTrackingToken kafkaTrackingToken, ConsumerRecord<K, V> consumerRecord) {
            this.token = kafkaTrackingToken;
            this.record = consumerRecord;
        }
    }

    private FetchEventsTask(Builder<K, V> builder) {
        this.consumer = ((Builder) builder).consumer;
        this.currentToken = ((Builder) builder).currentToken;
        this.buffer = ((Builder) builder).buffer;
        this.converter = ((Builder) builder).converter;
        this.callback = ((Builder) builder).callback;
        this.timeout = ((Builder) builder).timeout;
    }

    @Override // java.lang.Runnable
    public void run() {
        while (!Thread.currentThread().isInterrupted()) {
            try {
                try {
                    ConsumerRecords poll = this.consumer.poll(this.timeout);
                    if (logger.isDebugEnabled()) {
                        logger.debug("Fetched {} records", Integer.valueOf(poll.count()));
                    }
                    ArrayList arrayList = new ArrayList(poll.count());
                    ArrayList<CallbackEntry> arrayList2 = new ArrayList(poll.count());
                    Iterator it = poll.iterator();
                    while (it.hasNext()) {
                        ConsumerRecord<K, V> consumerRecord = (ConsumerRecord) it.next();
                        this.converter.readKafkaMessage(consumerRecord).ifPresent(eventMessage -> {
                            KafkaTrackingToken advancedTo = this.currentToken.advancedTo(consumerRecord.partition(), consumerRecord.offset());
                            if (logger.isDebugEnabled()) {
                                logger.debug("Updating token from {} -> {}", this.currentToken, advancedTo);
                            }
                            this.currentToken = advancedTo;
                            arrayList.add(KafkaEventMessage.from(eventMessage, consumerRecord, this.currentToken));
                            arrayList2.add(new CallbackEntry(this.currentToken, consumerRecord));
                        });
                    }
                    try {
                        this.buffer.putAll(arrayList);
                        for (CallbackEntry callbackEntry : arrayList2) {
                            this.callback.apply(callbackEntry.record, callbackEntry.token);
                        }
                    } catch (InterruptedException e) {
                        if (logger.isDebugEnabled()) {
                            logger.debug("Event producer thread was interrupted. Shutting down.", e);
                        }
                        Thread.currentThread().interrupt();
                    }
                } catch (Exception e2) {
                    logger.error("Cannot proceed with Fetching, encountered {} ", e2);
                    this.consumer.close();
                    return;
                }
            } finally {
                this.consumer.close();
            }
        }
    }

    public static <K, V> Builder<K, V> builder(Consumer<K, V> consumer, KafkaTrackingToken kafkaTrackingToken, Buffer<KafkaEventMessage> buffer, KafkaMessageConverter<K, V> kafkaMessageConverter, BiFunction<ConsumerRecord<K, V>, KafkaTrackingToken, Void> biFunction, long j) {
        return new Builder<>(consumer, kafkaTrackingToken, buffer, kafkaMessageConverter, biFunction, j);
    }
}
