package io.streamthoughts.azkarra.api.streams.consumer;

import io.streamthoughts.azkarra.api.time.Time;
import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerInterceptor;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;

/* loaded from: input_file:io/streamthoughts/azkarra/api/streams/consumer/MonitorOffsetsConsumerInterceptor.class */
public class MonitorOffsetsConsumerInterceptor<K, V> implements ConsumerInterceptor<K, V> {
    private final GlobalConsumerOffsetsRegistry registry = GlobalConsumerOffsetsRegistry.getInstance();
    private ConsumerGroupOffsetsState consumerGroupOffsets;
    private String clientId;

    public void configure(Map<String, ?> map) {
        String str = (String) map.get("group.id");
        this.clientId = (String) map.get("client.id");
        this.consumerGroupOffsets = this.registry.offsetsFor(str);
    }

    public ConsumerRecords<K, V> onConsume(ConsumerRecords<K, V> consumerRecords) {
        for (TopicPartition topicPartition : consumerRecords.partitions()) {
            for (ConsumerRecord consumerRecord : consumerRecords.records(topicPartition)) {
                OffsetAndTimestamp offsetAndTimestamp = new OffsetAndTimestamp(consumerRecord.offset(), consumerRecord.timestamp());
                this.consumerGroupOffsets.update(topicPartition, consumerThreadKey(), consumerLogOffsets -> {
                    return consumerLogOffsets.consumedOffset(offsetAndTimestamp);
                });
            }
        }
        return consumerRecords;
    }

    public void onCommit(Map<TopicPartition, OffsetAndMetadata> map) {
        long milliseconds = Time.SYSTEM.milliseconds();
        for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : map.entrySet()) {
            TopicPartition key = entry.getKey();
            OffsetAndTimestamp offsetAndTimestamp = new OffsetAndTimestamp(entry.getValue().offset(), milliseconds);
            this.consumerGroupOffsets.update(key, consumerThreadKey(), consumerLogOffsets -> {
                return consumerLogOffsets.committedOffset(offsetAndTimestamp);
            });
        }
    }

    private ConsumerThreadKey consumerThreadKey() {
        return new ConsumerThreadKey(Thread.currentThread().getName(), this.clientId);
    }

    public void close() {
    }
}
