package org.axonframework.kafka.eventhandling.consumer;

import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.function.BiFunction;
import java.util.function.Supplier;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.axonframework.common.Assert;
import org.axonframework.common.AxonThreadFactory;
import org.axonframework.eventhandling.TrackedEventMessage;
import org.axonframework.kafka.eventhandling.DefaultKafkaMessageConverter;
import org.axonframework.kafka.eventhandling.KafkaMessageConverter;
import org.axonframework.messaging.MessageStream;
import org.axonframework.serialization.xml.XStreamSerializer;

/* loaded from: input_file:org/axonframework/kafka/eventhandling/consumer/AsyncFetcher.class */
public class AsyncFetcher<K, V> implements Fetcher {
    private final Supplier<Buffer<KafkaEventMessage>> bufferFactory;
    private final ExecutorService pool;
    private final KafkaMessageConverter<K, V> converter;
    private final ConsumerFactory<K, V> consumerFactory;
    private final String topic;
    private final BiFunction<ConsumerRecord<K, V>, KafkaTrackingToken, Void> callback;
    private final long pollTimeout;
    private final boolean requirePoolShutdown;
    private final Set<FetchEventsTask> activeFetchers;

    /* loaded from: input_file:org/axonframework/kafka/eventhandling/consumer/AsyncFetcher$Builder.class */
    public static final class Builder<K, V> {
        private final ConsumerFactory<K, V> consumerFactory;
        private Supplier<Buffer<KafkaEventMessage>> bufferFactory;
        private KafkaMessageConverter<K, V> converter;
        private String topic;
        private long pollTimeout;
        private ExecutorService pool;
        private BiFunction<ConsumerRecord<K, V>, KafkaTrackingToken, Void> callback;
        private boolean requirePoolShutdown;

        private Builder(ConsumerFactory<K, V> consumerFactory) {
            this.bufferFactory = SortedKafkaMessageBuffer::new;
            this.converter = new DefaultKafkaMessageConverter(new XStreamSerializer());
            this.topic = "Axon.Events";
            this.pollTimeout = 5000L;
            this.pool = Executors.newCachedThreadPool(new AxonThreadFactory("AsyncFetcher-pool-thread"));
            this.callback = (consumerRecord, kafkaTrackingToken) -> {
                return null;
            };
            this.requirePoolShutdown = true;
            Assert.notNull(consumerFactory, () -> {
                return "ConsumerFactory may not be null";
            });
            this.consumerFactory = consumerFactory;
        }

        public Builder<K, V> withPool(ExecutorService executorService) {
            Assert.notNull(executorService, () -> {
                return "Pool may not be null";
            });
            this.requirePoolShutdown = false;
            this.pool = executorService;
            return this;
        }

        public Builder<K, V> onRecordPublished(BiFunction<ConsumerRecord<K, V>, KafkaTrackingToken, Void> biFunction) {
            Assert.notNull(biFunction, () -> {
                return "Callback may not be null";
            });
            this.callback = biFunction;
            return this;
        }

        public Builder<K, V> withPollTimeout(long j, TimeUnit timeUnit) {
            this.pollTimeout = timeUnit.toMillis(j);
            return this;
        }

        public Builder<K, V> withMessageConverter(KafkaMessageConverter<K, V> kafkaMessageConverter) {
            Assert.notNull(kafkaMessageConverter, () -> {
                return "Converter may not be null";
            });
            this.converter = kafkaMessageConverter;
            return this;
        }

        public Builder<K, V> withTopic(String str) {
            Assert.notNull(str, () -> {
                return "Topic may not be null";
            });
            this.topic = str;
            return this;
        }

        public Builder<K, V> withBufferFactory(Supplier<Buffer<KafkaEventMessage>> supplier) {
            Assert.notNull(supplier, () -> {
                return "Buffer factory may not be null";
            });
            this.bufferFactory = supplier;
            return this;
        }

        public Fetcher build() {
            return new AsyncFetcher(this);
        }
    }

    private AsyncFetcher(Builder<K, V> builder) {
        this.activeFetchers = ConcurrentHashMap.newKeySet();
        this.bufferFactory = ((Builder) builder).bufferFactory;
        this.consumerFactory = ((Builder) builder).consumerFactory;
        this.converter = ((Builder) builder).converter;
        this.topic = ((Builder) builder).topic;
        this.requirePoolShutdown = ((Builder) builder).requirePoolShutdown;
        this.pool = ((Builder) builder).pool;
        this.callback = ((Builder) builder).callback;
        this.pollTimeout = ((Builder) builder).pollTimeout;
    }

    public static <K, V> Builder<K, V> builder(Map<String, Object> map) {
        return builder(new DefaultConsumerFactory(map));
    }

    public static <K, V> Builder<K, V> builder(ConsumerFactory<K, V> consumerFactory) {
        return new Builder<>(consumerFactory);
    }

    @Override // org.axonframework.kafka.eventhandling.consumer.Fetcher
    public MessageStream<TrackedEventMessage<?>> start(KafkaTrackingToken kafkaTrackingToken) {
        Consumer<K, V> createConsumer = this.consumerFactory.createConsumer();
        ConsumerUtil.seek(this.topic, createConsumer, kafkaTrackingToken);
        if (KafkaTrackingToken.isEmpty(kafkaTrackingToken)) {
            kafkaTrackingToken = KafkaTrackingToken.emptyToken();
        }
        Buffer<KafkaEventMessage> buffer = this.bufferFactory.get();
        KafkaMessageConverter<K, V> kafkaMessageConverter = this.converter;
        BiFunction<ConsumerRecord<K, V>, KafkaTrackingToken, Void> biFunction = this.callback;
        long j = this.pollTimeout;
        Set<FetchEventsTask> set = this.activeFetchers;
        set.getClass();
        FetchEventsTask fetchEventsTask = new FetchEventsTask(createConsumer, kafkaTrackingToken, buffer, kafkaMessageConverter, biFunction, j, (v1) -> {
            r8.remove(v1);
        });
        this.activeFetchers.add(fetchEventsTask);
        this.pool.execute(fetchEventsTask);
        fetchEventsTask.getClass();
        return new KafkaMessageStream(buffer, fetchEventsTask::close);
    }

    @Override // org.axonframework.kafka.eventhandling.consumer.Fetcher
    public void shutdown() {
        this.activeFetchers.forEach((v0) -> {
            v0.close();
        });
        if (this.requirePoolShutdown) {
            this.pool.shutdown();
        }
    }
}
