package io.axual.client.consumer.base;

import io.axual.client.config.BaseConsumerConfig;
import io.axual.client.config.DeliveryStrategy;
import io.axual.client.exception.NoExistingStreamException;
import io.axual.client.proxy.axual.consumer.AxualConsumer;
import io.axual.client.proxy.generic.consumer.ConsumerProxy;
import io.axual.common.config.ClientConfig;
import io.axual.common.tools.KafkaUtil;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/axual/client/consumer/base/BaseMessageSource.class */
public abstract class BaseMessageSource<K, V> {
    private static final Logger LOG = LoggerFactory.getLogger(BaseMessageSource.class);
    private ConsumerProxy<K, V> consumer = null;
    private final ClientConfig clientConfig;
    private final BaseConsumerConfig<K, V> consumerConfig;
    private final CommitStrategy<K, V> commitStrategy;

    public BaseMessageSource(ClientConfig clientConfig, final BaseConsumerConfig<K, V> baseConsumerConfig) {
        this.clientConfig = clientConfig;
        this.consumerConfig = baseConsumerConfig;
        Committer<K, V> committer = new Committer<K, V>() { // from class: io.axual.client.consumer.base.BaseMessageSource.1
            private final AtomicLong lastCommitId = new AtomicLong(0);
            private Map<TopicPartition, OffsetAndMetadata> processed = new HashMap();

            @Override // io.axual.client.consumer.base.Committer
            public void markAsProcessed(BaseMessage<K, V> baseMessage) {
                this.processed.put(new TopicPartition(baseMessage.getRecord().topic(), baseMessage.getRecord().partition()), new OffsetAndMetadata(baseMessage.getRecord().offset() + 1, "{\"strategy\":\"" + baseConsumerConfig.getDeliveryStrategy().name() + "\"}"));
            }

            @Override // io.axual.client.consumer.base.Committer
            public void commitProcessedOffsets(boolean z) {
                if (this.processed.isEmpty() || BaseMessageSource.this.consumer == null) {
                    return;
                }
                long incrementAndGet = this.lastCommitId.incrementAndGet();
                if (z) {
                    BaseMessageSource.LOG.debug("Commit sync: id={}", Long.valueOf(incrementAndGet));
                    BaseMessageSource.this.consumer.commitSync(this.processed);
                } else {
                    BaseMessageSource.LOG.debug("Commit async: id={}, partition_count={}", Long.valueOf(incrementAndGet), Integer.valueOf(this.processed.size()));
                    BaseMessageSource.this.consumer.commitAsync(this.processed, (map, exc) -> {
                        if (exc == null) {
                            BaseMessageSource.LOG.debug("Commit successful: id={}", Long.valueOf(incrementAndGet));
                        } else {
                            BaseMessageSource.LOG.debug("Commit not successful: id={}", Long.valueOf(incrementAndGet), exc);
                        }
                    });
                }
                this.processed.clear();
            }
        };
        if (baseConsumerConfig.getDeliveryStrategy() == DeliveryStrategy.AT_MOST_ONCE) {
            this.commitStrategy = new CommitStrategyAMO(committer);
        } else {
            this.commitStrategy = new CommitStrategyALO(committer);
        }
    }

    public BaseConsumerConfig<K, V> getConsumerConfig() {
        return this.consumerConfig;
    }

    public String getInfo() {
        return "stream = " + this.consumerConfig.getStream();
    }

    public List<BaseMessage<K, V>> getMessages() {
        if (this.consumer == null) {
            Map<String, Object> consumerConfigs = getConsumerConfigs();
            consumerConfigs.put("axualconsumer.chain", this.consumerConfig.getProxyChain());
            LOG.debug("Creating a new Axual consumer with properties: {}", consumerConfigs);
            this.consumer = new AxualConsumer(consumerConfigs);
            LOG.debug("Created a new Axual consumer");
            List partitionsFor = this.consumer.partitionsFor(this.consumerConfig.getStream());
            if (partitionsFor == null || partitionsFor.isEmpty()) {
                throw new NoExistingStreamException("No partitions found for stream", this.consumerConfig.getStream());
            }
            this.consumer.subscribe(Collections.singletonList(this.consumerConfig.getStream()));
            LOG.debug("Subscribed consumer to stream: {}", this.consumerConfig.getStream());
        }
        ConsumerRecords poll = this.consumer.poll(Duration.ofMillis(100L));
        ArrayList arrayList = new ArrayList(poll.count());
        LOG.debug("Poll retrieved {} messages", Integer.valueOf(poll.count()));
        Iterator it = poll.iterator();
        while (it.hasNext()) {
            arrayList.add(new BaseMessage((ConsumerRecord) it.next()));
        }
        this.commitStrategy.onAfterFetchBatch(arrayList);
        return arrayList;
    }

    public void close() {
        this.commitStrategy.close();
        LOG.info("Closing the Kafka consumer");
        if (this.consumer == null) {
            LOG.info("Kafka consumer was already closed");
            return;
        }
        this.consumer.close();
        this.consumer = null;
        LOG.info("Closed the Kafka consumer");
    }

    protected Map<String, Object> getConsumerConfigs() {
        Map<String, Object> kafkaConfigs = KafkaUtil.getKafkaConfigs(this.clientConfig);
        kafkaConfigs.put("key.deserializer", this.consumerConfig.getKeyDeserializer());
        kafkaConfigs.put("value.deserializer", this.consumerConfig.getValueDeserializer());
        kafkaConfigs.put("enable.auto.commit", "false");
        kafkaConfigs.put("auto.offset.reset", this.consumerConfig.getDeliveryStrategy() == DeliveryStrategy.AT_LEAST_ONCE ? "earliest" : "latest");
        kafkaConfigs.put("max.poll.records", this.consumerConfig.getMaximumPollSize().toString());
        return kafkaConfigs;
    }

    public void onAfterProcessBatch() {
        this.commitStrategy.onAfterProcessBatch();
    }

    public void onAfterProcessMessage(BaseMessage<K, V> baseMessage, Throwable th) {
        this.commitStrategy.onAfterProcessMessage(baseMessage, th);
    }
}
