package org.axonframework.kafka.eventhandling.consumer;

import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.function.BiFunction;
import java.util.function.Supplier;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.axonframework.common.Assert;
import org.axonframework.common.AxonThreadFactory;
import org.axonframework.eventhandling.TrackedEventMessage;
import org.axonframework.kafka.eventhandling.DefaultKafkaMessageConverter;
import org.axonframework.kafka.eventhandling.KafkaMessageConverter;
import org.axonframework.messaging.MessageStream;
import org.axonframework.serialization.xml.XStreamSerializer;

/* loaded from: input_file:org/axonframework/kafka/eventhandling/consumer/AsyncFetcher.class */
public class AsyncFetcher<K, V> implements Fetcher<K, V> {
    private final Supplier<Buffer<KafkaEventMessage>> bufferFactory;
    private final ExecutorService pool;
    private final KafkaMessageConverter<K, V> converter;
    private final Consumer<K, V> consumer;
    private final String topic;
    private final BiFunction<ConsumerRecord<K, V>, KafkaTrackingToken, Void> callback;
    private final long pollTimeout;

    /* loaded from: input_file:org/axonframework/kafka/eventhandling/consumer/AsyncFetcher$Builder.class */
    public static final class Builder<K, V> {
        private final ConsumerFactory<K, V> consumerFactory;
        private Supplier<Buffer<KafkaEventMessage>> bufferFactory;
        private KafkaMessageConverter<K, V> converter;
        private String topic;
        private long pollTimeout;
        private ExecutorService pool;
        private BiFunction<ConsumerRecord<K, V>, KafkaTrackingToken, Void> callback;

        private Builder(ConsumerFactory<K, V> consumerFactory) {
            this.bufferFactory = SortedKafkaMessageBuffer::new;
            this.converter = new DefaultKafkaMessageConverter(new XStreamSerializer());
            this.topic = "events";
            this.pollTimeout = 5000L;
            this.pool = Executors.newCachedThreadPool(new AxonThreadFactory("AsyncFetcher-pool-thread"));
            this.callback = (consumerRecord, kafkaTrackingToken) -> {
                return null;
            };
            Assert.notNull(consumerFactory, () -> {
                return "ConsumerFactory may not be null";
            });
            this.consumerFactory = consumerFactory;
        }

        public Builder<K, V> withPool(ExecutorService executorService) {
            Assert.notNull(executorService, () -> {
                return "Pool may not be null";
            });
            this.pool = executorService;
            return this;
        }

        public Builder<K, V> onRecordPublished(BiFunction<ConsumerRecord<K, V>, KafkaTrackingToken, Void> biFunction) {
            Assert.notNull(biFunction, () -> {
                return "Callback may not be null";
            });
            this.callback = biFunction;
            return this;
        }

        public Builder<K, V> withPollTimeout(long j, TimeUnit timeUnit) {
            this.pollTimeout = timeUnit.toMillis(j);
            return this;
        }

        public Builder<K, V> withMessageConverter(KafkaMessageConverter<K, V> kafkaMessageConverter) {
            Assert.notNull(kafkaMessageConverter, () -> {
                return "Converter may not be null";
            });
            this.converter = kafkaMessageConverter;
            return this;
        }

        public Builder<K, V> withTopic(String str) {
            Assert.notNull(str, () -> {
                return "Topic may not be null";
            });
            this.topic = str;
            return this;
        }

        public Builder<K, V> withBufferFactory(Supplier<Buffer<KafkaEventMessage>> supplier) {
            Assert.notNull(supplier, () -> {
                return "Buffer factory may not be null";
            });
            this.bufferFactory = supplier;
            return this;
        }

        public Fetcher<K, V> build() {
            return new AsyncFetcher(this);
        }
    }

    private AsyncFetcher(Builder<K, V> builder) {
        this.bufferFactory = ((Builder) builder).bufferFactory;
        this.consumer = ((Builder) builder).consumerFactory.createConsumer();
        this.converter = ((Builder) builder).converter;
        this.topic = ((Builder) builder).topic;
        this.pool = ((Builder) builder).pool;
        this.callback = ((Builder) builder).callback;
        this.pollTimeout = ((Builder) builder).pollTimeout;
    }

    @Override // org.axonframework.kafka.eventhandling.consumer.Fetcher
    public MessageStream<TrackedEventMessage<?>> start(KafkaTrackingToken kafkaTrackingToken) {
        ConsumerUtil.seek(this.topic, this.consumer, kafkaTrackingToken);
        if (KafkaTrackingToken.isEmpty(kafkaTrackingToken)) {
            kafkaTrackingToken = KafkaTrackingToken.emptyToken();
        }
        Buffer<KafkaEventMessage> buffer = this.bufferFactory.get();
        Future<?> submit = this.pool.submit(FetchEventsTask.builder(this.consumer, kafkaTrackingToken, buffer, this.converter, this.callback, this.pollTimeout).build());
        return new KafkaMessageStream(buffer, () -> {
            submit.cancel(true);
        });
    }

    @Override // org.axonframework.kafka.eventhandling.consumer.Fetcher
    public void shutdown() {
        this.pool.shutdown();
    }

    public static <K, V> Builder<K, V> builder(Map<String, Object> map) {
        return builder(new DefaultConsumerFactory(map));
    }

    public static <K, V> Builder<K, V> builder(ConsumerFactory<K, V> consumerFactory) {
        return new Builder<>(consumerFactory);
    }
}
