package org.apache.kafka.streams.state.internals;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.Avg;
import org.apache.kafka.common.metrics.stats.Max;
import org.apache.kafka.common.metrics.stats.Min;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.state.internals.ThreadCache;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:BOOT-INF/lib/kafka-streams-2.0.0.jar:org/apache/kafka/streams/state/internals/NamedCache.class */
public class NamedCache {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) NamedCache.class);
    private final String name;
    private ThreadCache.DirtyEntryFlushListener listener;
    private LRUNode tail;
    private LRUNode head;
    private long currentSizeBytes;
    private final NamedCacheMetrics namedCacheMetrics;
    private final TreeMap<Bytes, LRUNode> cache = new TreeMap<>();
    private final Set<Bytes> dirtyKeys = new LinkedHashSet();
    private long numReadHits = 0;
    private long numReadMisses = 0;
    private long numOverwrites = 0;
    private long numFlushes = 0;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:BOOT-INF/lib/kafka-streams-2.0.0.jar:org/apache/kafka/streams/state/internals/NamedCache$LRUNode.class */
    public static class LRUNode {
        private final Bytes key;
        private LRUCacheEntry entry;
        private LRUNode previous;
        private LRUNode next;

        LRUNode(Bytes bytes, LRUCacheEntry lRUCacheEntry) {
            this.key = bytes;
            this.entry = lRUCacheEntry;
        }

        LRUCacheEntry entry() {
            return this.entry;
        }

        Bytes key() {
            return this.key;
        }

        long size() {
            return this.key.get().length + 8 + 8 + 8 + this.entry.size();
        }

        LRUNode next() {
            return this.next;
        }

        LRUNode previous() {
            return this.previous;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void update(LRUCacheEntry lRUCacheEntry) {
            this.entry = lRUCacheEntry;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:BOOT-INF/lib/kafka-streams-2.0.0.jar:org/apache/kafka/streams/state/internals/NamedCache$NamedCacheMetrics.class */
    public static class NamedCacheMetrics {
        private final StreamsMetricsImpl metrics;
        private final Sensor hitRatioSensor;
        private final String taskName;
        private final String cacheName;

        private NamedCacheMetrics(StreamsMetricsImpl streamsMetricsImpl, String str) {
            this.taskName = ThreadCache.taskIDfromCacheName(str);
            this.cacheName = str;
            this.metrics = streamsMetricsImpl;
            Map<String, String> tagMap = streamsMetricsImpl.tagMap("record-cache-id", StreamsConfig.OPTIMIZE, "task-id", this.taskName);
            Sensor taskLevelSensor = streamsMetricsImpl.taskLevelSensor("hitRatio", this.taskName, Sensor.RecordingLevel.DEBUG, new Sensor[0]);
            taskLevelSensor.add(new MetricName("hitRatio-avg", "stream-record-cache-metrics", "The average cache hit ratio.", tagMap), new Avg());
            taskLevelSensor.add(new MetricName("hitRatio-min", "stream-record-cache-metrics", "The minimum cache hit ratio.", tagMap), new Min());
            taskLevelSensor.add(new MetricName("hitRatio-max", "stream-record-cache-metrics", "The maximum cache hit ratio.", tagMap), new Max());
            Map<String, String> tagMap2 = streamsMetricsImpl.tagMap("record-cache-id", ThreadCache.underlyingStoreNamefromCacheName(str), "task-id", this.taskName);
            this.hitRatioSensor = streamsMetricsImpl.cacheLevelSensor(this.taskName, str, "hitRatio", Sensor.RecordingLevel.DEBUG, taskLevelSensor);
            this.hitRatioSensor.add(new MetricName("hitRatio-avg", "stream-record-cache-metrics", "The average cache hit ratio.", tagMap2), new Avg());
            this.hitRatioSensor.add(new MetricName("hitRatio-min", "stream-record-cache-metrics", "The minimum cache hit ratio.", tagMap2), new Min());
            this.hitRatioSensor.add(new MetricName("hitRatio-max", "stream-record-cache-metrics", "The maximum cache hit ratio.", tagMap2), new Max());
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void removeAllSensors() {
            this.metrics.removeAllCacheLevelSensors(this.taskName, this.cacheName);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public NamedCache(String str, StreamsMetricsImpl streamsMetricsImpl) {
        this.name = str;
        this.namedCacheMetrics = new NamedCacheMetrics(streamsMetricsImpl, str);
    }

    final synchronized String name() {
        return this.name;
    }

    synchronized long hits() {
        return this.numReadHits;
    }

    synchronized long misses() {
        return this.numReadMisses;
    }

    synchronized long overwrites() {
        return this.numOverwrites;
    }

    synchronized long flushes() {
        return this.numFlushes;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public synchronized LRUCacheEntry get(Bytes bytes) {
        LRUNode internal;
        if (bytes == null || (internal = getInternal(bytes)) == null) {
            return null;
        }
        updateLRU(internal);
        return internal.entry;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public synchronized void setListener(ThreadCache.DirtyEntryFlushListener dirtyEntryFlushListener) {
        this.listener = dirtyEntryFlushListener;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public synchronized void flush() {
        flush(null);
    }

    private void flush(LRUNode lRUNode) {
        this.numFlushes++;
        if (log.isTraceEnabled()) {
            log.trace("Named cache {} stats on flush: #hits={}, #misses={}, #overwrites={}, #flushes={}", this.name, Long.valueOf(hits()), Long.valueOf(misses()), Long.valueOf(overwrites()), Long.valueOf(flushes()));
        }
        if (this.listener == null) {
            throw new IllegalArgumentException("No listener for namespace " + this.name + " registered with cache");
        }
        if (this.dirtyKeys.isEmpty()) {
            return;
        }
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        if (lRUNode != null) {
            arrayList.add(new ThreadCache.DirtyEntry(lRUNode.key, lRUNode.entry.value(), lRUNode.entry));
            this.dirtyKeys.remove(lRUNode.key);
        }
        for (Bytes bytes : this.dirtyKeys) {
            LRUNode internal = getInternal(bytes);
            if (internal == null) {
                throw new IllegalStateException("Key = " + bytes + " found in dirty key set, but entry is null");
            }
            arrayList.add(new ThreadCache.DirtyEntry(bytes, internal.entry.value(), internal.entry));
            internal.entry.markClean();
            if (internal.entry.value() == null) {
                arrayList2.add(internal.key);
            }
        }
        this.dirtyKeys.clear();
        this.listener.apply(arrayList);
        Iterator it = arrayList2.iterator();
        while (it.hasNext()) {
            delete((Bytes) it.next());
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public synchronized void put(Bytes bytes, LRUCacheEntry lRUCacheEntry) {
        if (!lRUCacheEntry.isDirty() && this.dirtyKeys.contains(bytes)) {
            throw new IllegalStateException(String.format("Attempting to put a clean entry for key [%s] into NamedCache [%s] when it already contains a dirty entry for the same key", bytes, this.name));
        }
        LRUNode lRUNode = this.cache.get(bytes);
        if (lRUNode != null) {
            this.numOverwrites++;
            this.currentSizeBytes -= lRUNode.size();
            lRUNode.update(lRUCacheEntry);
            updateLRU(lRUNode);
        } else {
            lRUNode = new LRUNode(bytes, lRUCacheEntry);
            putHead(lRUNode);
            this.cache.put(bytes, lRUNode);
        }
        if (lRUCacheEntry.isDirty()) {
            this.dirtyKeys.remove(bytes);
            this.dirtyKeys.add(bytes);
        }
        this.currentSizeBytes += lRUNode.size();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public synchronized long sizeInBytes() {
        return this.currentSizeBytes;
    }

    private LRUNode getInternal(Bytes bytes) {
        LRUNode lRUNode = this.cache.get(bytes);
        if (lRUNode == null) {
            this.numReadMisses++;
            return null;
        }
        this.numReadHits++;
        this.namedCacheMetrics.hitRatioSensor.record(this.numReadHits / (this.numReadHits + this.numReadMisses));
        return lRUNode;
    }

    private void updateLRU(LRUNode lRUNode) {
        remove(lRUNode);
        putHead(lRUNode);
    }

    private void remove(LRUNode lRUNode) {
        if (lRUNode.previous != null) {
            lRUNode.previous.next = lRUNode.next;
        } else {
            this.head = lRUNode.next;
        }
        if (lRUNode.next == null) {
            this.tail = lRUNode.previous;
        } else {
            lRUNode.next.previous = lRUNode.previous;
        }
    }

    private void putHead(LRUNode lRUNode) {
        lRUNode.next = this.head;
        lRUNode.previous = null;
        if (this.head != null) {
            this.head.previous = lRUNode;
        }
        this.head = lRUNode;
        if (this.tail == null) {
            this.tail = this.head;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public synchronized void evict() {
        if (this.tail == null) {
            return;
        }
        LRUNode lRUNode = this.tail;
        this.currentSizeBytes -= lRUNode.size();
        remove(lRUNode);
        this.cache.remove(lRUNode.key);
        if (lRUNode.entry.isDirty()) {
            flush(lRUNode);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public synchronized LRUCacheEntry putIfAbsent(Bytes bytes, LRUCacheEntry lRUCacheEntry) {
        LRUCacheEntry lRUCacheEntry2 = get(bytes);
        if (lRUCacheEntry2 == null) {
            put(bytes, lRUCacheEntry);
        }
        return lRUCacheEntry2;
    }

    synchronized void putAll(List<KeyValue<byte[], LRUCacheEntry>> list) {
        for (KeyValue<byte[], LRUCacheEntry> keyValue : list) {
            put(Bytes.wrap(keyValue.key), keyValue.value);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public synchronized LRUCacheEntry delete(Bytes bytes) {
        LRUNode remove = this.cache.remove(bytes);
        if (remove == null) {
            return null;
        }
        remove(remove);
        this.dirtyKeys.remove(bytes);
        this.currentSizeBytes -= remove.size();
        return remove.entry();
    }

    public long size() {
        return this.cache.size();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public synchronized Iterator<Bytes> keyRange(Bytes bytes, Bytes bytes2) {
        return keySetIterator(this.cache.navigableKeySet().subSet(bytes, true, bytes2, true));
    }

    private Iterator<Bytes> keySetIterator(Set<Bytes> set) {
        return new TreeSet(set).iterator();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public synchronized Iterator<Bytes> allKeys() {
        return keySetIterator(this.cache.navigableKeySet());
    }

    synchronized LRUCacheEntry first() {
        if (this.head == null) {
            return null;
        }
        return this.head.entry;
    }

    synchronized LRUCacheEntry last() {
        if (this.tail == null) {
            return null;
        }
        return this.tail.entry;
    }

    synchronized LRUNode head() {
        return this.head;
    }

    synchronized LRUNode tail() {
        return this.tail;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public synchronized void close() {
        this.tail = null;
        this.head = null;
        this.listener = null;
        this.currentSizeBytes = 0L;
        this.dirtyKeys.clear();
        this.cache.clear();
        this.namedCacheMetrics.removeAllSensors();
    }
}
