package org.creekservice.internal.kafka.streams.extension.observation;

import java.util.Map;
import java.util.Objects;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import java.util.regex.Pattern;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.creekservice.api.kafka.streams.extension.observation.KafkaMetricsFilter;
import org.creekservice.api.kafka.streams.extension.observation.KafkaMetricsPublisherOptions;
import org.creekservice.api.observability.logging.structured.LogEntryCustomizer;
import org.creekservice.api.observability.logging.structured.StructuredLogger;
import org.creekservice.api.observability.logging.structured.StructuredLoggerFactory;

/* loaded from: input_file:org/creekservice/internal/kafka/streams/extension/observation/DefaultMetricsPublisher.class */
public final class DefaultMetricsPublisher implements MetricsPublisher {
    private static final StructuredLogger LOGGER = StructuredLoggerFactory.internalLogger(DefaultMetricsPublisher.class);
    private static final Pattern SANITIZE_PATTERN = Pattern.compile("[^A-Za-z0-9]");
    private final KafkaMetricsPublisherOptions options;
    private final StructuredLogger logger;
    private final ScheduledExecutorService scheduler;

    public DefaultMetricsPublisher(KafkaMetricsPublisherOptions kafkaMetricsPublisherOptions) {
        this(kafkaMetricsPublisherOptions, LOGGER, Executors.newScheduledThreadPool(1, runnable -> {
            return new Thread(runnable, "creek-metrics-publisher");
        }));
    }

    DefaultMetricsPublisher(KafkaMetricsPublisherOptions kafkaMetricsPublisherOptions, StructuredLogger structuredLogger, ScheduledExecutorService scheduledExecutorService) {
        this.options = (KafkaMetricsPublisherOptions) Objects.requireNonNull(kafkaMetricsPublisherOptions, "options");
        this.logger = (StructuredLogger) Objects.requireNonNull(structuredLogger, "logger");
        this.scheduler = (ScheduledExecutorService) Objects.requireNonNull(scheduledExecutorService, "scheduler");
    }

    @Override // org.creekservice.internal.kafka.streams.extension.observation.MetricsPublisher
    public void schedule(Supplier<Map<MetricName, ? extends Metric>> supplier) {
        long millis = this.options.publishPeriod().toMillis();
        if (millis <= 0) {
            return;
        }
        this.scheduler.scheduleAtFixedRate(() -> {
            log(supplier);
        }, millis, millis, TimeUnit.MILLISECONDS);
    }

    @Override // org.creekservice.internal.kafka.streams.extension.observation.MetricsPublisher, java.lang.AutoCloseable
    public void close() {
        this.scheduler.shutdown();
        try {
            if (!this.scheduler.awaitTermination(2 * this.options.publishPeriod().toMillis(), TimeUnit.MILLISECONDS)) {
                this.logger.warn("Timed out shutting down scheduler");
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

    private void log(Supplier<Map<MetricName, ? extends Metric>> supplier) {
        try {
            this.logger.info("Kafka metrics", logEntryCustomizer -> {
                ((Map) supplier.get()).forEach((metricName, metric) -> {
                    addMetric(logEntryCustomizer, metricName, metric);
                });
            });
        } catch (Exception e) {
            this.logger.error("Logging of Kafka metrics failed", logEntryCustomizer2 -> {
                logEntryCustomizer2.withThrowable(e);
            });
        }
    }

    private void addMetric(LogEntryCustomizer logEntryCustomizer, MetricName metricName, Metric metric) {
        KafkaMetricsFilter metricsFilter = this.options.metricsFilter();
        if (metricsFilter.includeMetric(metricName)) {
            Object metricValue = metric.metricValue();
            if (metricsFilter.includeValue(metricValue)) {
                nsByTags(logEntryCustomizer, metricName).with(sanitize(metricName.name()), metricValue);
            }
        }
    }

    private LogEntryCustomizer nsByTags(LogEntryCustomizer logEntryCustomizer, MetricName metricName) {
        LogEntryCustomizer[] logEntryCustomizerArr = {logEntryCustomizer.ns(sanitize(metricName.group()))};
        this.options.metricsFilter().filterTags(metricName.tags()).sorted().forEachOrdered(str -> {
            logEntryCustomizerArr[0] = logEntryCustomizerArr[0].ns(sanitize(str));
        });
        return logEntryCustomizerArr[0];
    }

    private static String sanitize(String str) {
        return SANITIZE_PATTERN.matcher(str).replaceAll("_");
    }
}
