package org.axonframework.extensions.kafka.eventhandling.consumer;

import java.lang.invoke.MethodHandles;
import java.time.Duration;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.kafka.clients.consumer.Consumer;
import org.axonframework.common.AxonThreadFactory;
import org.axonframework.common.BuilderUtils;
import org.axonframework.common.Registration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/axonframework/extensions/kafka/eventhandling/consumer/AsyncFetcher.class */
public class AsyncFetcher<K, V, E> implements Fetcher<K, V, E> {
    private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
    private static final int DEFAULT_POLL_TIMEOUT_MS = 5000;
    private final Duration pollTimeout;
    private final ExecutorService executorService;
    private final boolean requirePoolShutdown;
    private final Set<FetchEventsTask<K, V, E>> activeFetchers = ConcurrentHashMap.newKeySet();

    /* loaded from: input_file:org/axonframework/extensions/kafka/eventhandling/consumer/AsyncFetcher$Builder.class */
    public static final class Builder<K, V, E> {
        private Duration pollTimeout = Duration.ofMillis(5000);
        private ExecutorService executorService = Executors.newCachedThreadPool(new AxonThreadFactory("AsyncFetcher"));
        private boolean requirePoolShutdown = true;

        public Builder<K, V, E> pollTimeout(long j) {
            BuilderUtils.assertThat(Long.valueOf(j), l -> {
                return l.longValue() > 0;
            }, "The poll timeout may not be negative [" + j + "]");
            this.pollTimeout = Duration.ofMillis(j);
            return this;
        }

        public Builder<K, V, E> executorService(ExecutorService executorService) {
            BuilderUtils.assertNonNull(executorService, "ExecutorService may not be null");
            this.requirePoolShutdown = false;
            this.executorService = executorService;
            return this;
        }

        public AsyncFetcher<K, V, E> build() {
            return new AsyncFetcher<>(this);
        }
    }

    public static <K, V, E> Builder<K, V, E> builder() {
        return new Builder<>();
    }

    protected AsyncFetcher(Builder<K, V, E> builder) {
        this.pollTimeout = ((Builder) builder).pollTimeout;
        this.executorService = ((Builder) builder).executorService;
        this.requirePoolShutdown = ((Builder) builder).requirePoolShutdown;
    }

    @Override // org.axonframework.extensions.kafka.eventhandling.consumer.Fetcher
    public Registration poll(Consumer<K, V> consumer, RecordConverter<K, V, E> recordConverter, EventConsumer<E> eventConsumer) {
        Duration duration = this.pollTimeout;
        Set<FetchEventsTask<K, V, E>> set = this.activeFetchers;
        set.getClass();
        FetchEventsTask<K, V, E> fetchEventsTask = new FetchEventsTask<>(consumer, duration, recordConverter, eventConsumer, (v1) -> {
            r6.remove(v1);
        });
        this.activeFetchers.add(fetchEventsTask);
        this.executorService.execute(fetchEventsTask);
        return () -> {
            fetchEventsTask.close();
            return true;
        };
    }

    @Override // org.axonframework.extensions.kafka.eventhandling.consumer.Fetcher
    public void shutdown() {
        logger.info("Shutting down AsyncFetcher");
        this.activeFetchers.forEach((v0) -> {
            v0.close();
        });
        if (this.requirePoolShutdown) {
            this.executorService.shutdown();
        }
    }
}
