package io.americanexpress.synapse.subscriber.kafka.interceptor;

import java.nio.charset.StandardCharsets;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.slf4j.ext.XLogger;
import org.slf4j.ext.XLoggerFactory;
import org.springframework.kafka.listener.BatchInterceptor;
import org.springframework.kafka.listener.RecordInterceptor;
import org.springframework.stereotype.Component;

@Component
/* loaded from: input_file:io/americanexpress/synapse/subscriber/kafka/interceptor/BaseKafkaSubscriberMetricInterceptor.class */
public class BaseKafkaSubscriberMetricInterceptor<K, V> implements RecordInterceptor<K, V>, BatchInterceptor<K, V> {
    protected final XLogger log = XLoggerFactory.getXLogger(getClass());

    public ConsumerRecord<K, V> intercept(ConsumerRecord<K, V> consumerRecord) {
        startMetricLog(consumerRecord);
        return preHandle(consumerRecord);
    }

    public void afterRecord(ConsumerRecord<K, V> consumerRecord, Consumer<K, V> consumer) {
        endMetricLog(consumerRecord);
        postHandle(consumerRecord, consumer);
    }

    protected ConsumerRecord<K, V> preHandle(ConsumerRecord<K, V> consumerRecord) {
        return consumerRecord;
    }

    protected void postHandle(ConsumerRecord<K, V> consumerRecord, Consumer<K, V> consumer) {
    }

    public ConsumerRecords<K, V> intercept(ConsumerRecords<K, V> consumerRecords, Consumer<K, V> consumer) {
        consumerRecords.iterator().forEachRemaining(this::startMetricLog);
        return preHandleBatch(consumerRecords);
    }

    public void success(ConsumerRecords<K, V> consumerRecords, Consumer<K, V> consumer) {
        consumerRecords.iterator().forEachRemaining(this::endMetricLog);
        postHandleBatch(consumerRecords, consumer);
    }

    protected ConsumerRecords<K, V> preHandleBatch(ConsumerRecords<K, V> consumerRecords) {
        return consumerRecords;
    }

    protected void postHandleBatch(ConsumerRecords<K, V> consumerRecords, Consumer<K, V> consumer) {
    }

    private void startMetricLog(ConsumerRecord<K, V> consumerRecord) {
        consumerRecord.headers().add("startTime", String.valueOf(System.currentTimeMillis()).getBytes());
        this.log.info("TOPIC: {}, PARTITION: {}, OFFSET: {}, KEY: {}", new Object[]{consumerRecord.topic(), Integer.valueOf(consumerRecord.partition()), Long.valueOf(consumerRecord.offset()), consumerRecord.key()});
    }

    private void endMetricLog(ConsumerRecord<K, V> consumerRecord) {
        this.log.info("TOPIC: {}, PARTITION: {}, OFFSET: {}, KEY: {}, PROCESSING_TIME: {} ms.", new Object[]{consumerRecord.topic(), Integer.valueOf(consumerRecord.partition()), Long.valueOf(consumerRecord.offset()), consumerRecord.key(), Long.valueOf(System.currentTimeMillis() - Long.parseLong(new String(consumerRecord.headers().lastHeader("startTime").value(), StandardCharsets.UTF_8)))});
    }
}
