package org.apache.kafka.streams.state.internals;

import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import kotlin.jvm.internal.LongCompanionObject;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.slf4j.Logger;

/* loaded from: input_file:org/apache/kafka/streams/state/internals/ThreadCache.class */
public class ThreadCache {
    private final Logger log;
    private final long maxCacheSizeBytes;
    private final StreamsMetricsImpl metrics;
    private final Map<String, NamedCache> caches = new HashMap();
    private long numPuts = 0;
    private long numGets = 0;
    private long numEvicts = 0;
    private long numFlushes = 0;

    /* loaded from: input_file:org/apache/kafka/streams/state/internals/ThreadCache$DirtyEntry.class */
    static class DirtyEntry {
        private final Bytes key;
        private final byte[] newValue;
        private final LRUCacheEntry recordContext;

        /* JADX INFO: Access modifiers changed from: package-private */
        public DirtyEntry(Bytes bytes, byte[] bArr, LRUCacheEntry lRUCacheEntry) {
            this.key = bytes;
            this.newValue = bArr;
            this.recordContext = lRUCacheEntry;
        }

        public Bytes key() {
            return this.key;
        }

        public byte[] newValue() {
            return this.newValue;
        }

        public LRUCacheEntry entry() {
            return this.recordContext;
        }
    }

    /* loaded from: input_file:org/apache/kafka/streams/state/internals/ThreadCache$DirtyEntryFlushListener.class */
    public interface DirtyEntryFlushListener {
        void apply(List<DirtyEntry> list);
    }

    /* loaded from: input_file:org/apache/kafka/streams/state/internals/ThreadCache$MemoryLRUCacheBytesIterator.class */
    static class MemoryLRUCacheBytesIterator implements PeekingKeyValueIterator<Bytes, LRUCacheEntry> {
        private final Iterator<Bytes> keys;
        private final NamedCache cache;
        private KeyValue<Bytes, LRUCacheEntry> nextEntry;

        MemoryLRUCacheBytesIterator(Iterator<Bytes> it, NamedCache namedCache) {
            this.keys = it;
            this.cache = namedCache;
        }

        @Override // org.apache.kafka.streams.state.KeyValueIterator
        public Bytes peekNextKey() {
            if (hasNext()) {
                return this.nextEntry.key;
            }
            throw new NoSuchElementException();
        }

        @Override // org.apache.kafka.streams.state.internals.PeekingKeyValueIterator
        public KeyValue<Bytes, LRUCacheEntry> peekNext() {
            if (hasNext()) {
                return this.nextEntry;
            }
            throw new NoSuchElementException();
        }

        @Override // java.util.Iterator
        public boolean hasNext() {
            if (this.nextEntry != null) {
                return true;
            }
            while (this.keys.hasNext() && this.nextEntry == null) {
                internalNext();
            }
            return this.nextEntry != null;
        }

        @Override // java.util.Iterator
        public KeyValue<Bytes, LRUCacheEntry> next() {
            if (!hasNext()) {
                throw new NoSuchElementException();
            }
            KeyValue<Bytes, LRUCacheEntry> keyValue = this.nextEntry;
            this.nextEntry = null;
            return keyValue;
        }

        private void internalNext() {
            Bytes next = this.keys.next();
            LRUCacheEntry lRUCacheEntry = this.cache.get(next);
            if (lRUCacheEntry == null) {
                return;
            }
            this.nextEntry = new KeyValue<>(next, lRUCacheEntry);
        }

        @Override // org.apache.kafka.streams.state.KeyValueIterator, java.io.Closeable, java.lang.AutoCloseable
        public void close() {
        }
    }

    public ThreadCache(LogContext logContext, long j, StreamsMetricsImpl streamsMetricsImpl) {
        this.maxCacheSizeBytes = j;
        this.metrics = streamsMetricsImpl;
        this.log = logContext.logger(getClass());
    }

    public long puts() {
        return this.numPuts;
    }

    public long gets() {
        return this.numGets;
    }

    public long evicts() {
        return this.numEvicts;
    }

    public long flushes() {
        return this.numFlushes;
    }

    public static String nameSpaceFromTaskIdAndStore(String str, String str2) {
        return str + "-" + str2;
    }

    public static String taskIDfromCacheName(String str) {
        return str.split("-", 2)[0];
    }

    public static String underlyingStoreNamefromCacheName(String str) {
        return str.split("-", 2)[1];
    }

    public void addDirtyEntryFlushListener(String str, DirtyEntryFlushListener dirtyEntryFlushListener) {
        getOrCreateCache(str).setListener(dirtyEntryFlushListener);
    }

    public void flush(String str) {
        this.numFlushes++;
        NamedCache cache = getCache(str);
        if (cache == null) {
            return;
        }
        cache.flush();
        if (this.log.isTraceEnabled()) {
            this.log.trace("Cache stats on flush: #puts={}, #gets={}, #evicts={}, #flushes={}", Long.valueOf(puts()), Long.valueOf(gets()), Long.valueOf(evicts()), Long.valueOf(flushes()));
        }
    }

    public LRUCacheEntry get(String str, Bytes bytes) {
        NamedCache cache;
        this.numGets++;
        if (bytes == null || (cache = getCache(str)) == null) {
            return null;
        }
        return cache.get(bytes);
    }

    public void put(String str, Bytes bytes, LRUCacheEntry lRUCacheEntry) {
        this.numPuts++;
        getOrCreateCache(str).put(bytes, lRUCacheEntry);
        maybeEvict(str);
    }

    public LRUCacheEntry putIfAbsent(String str, Bytes bytes, LRUCacheEntry lRUCacheEntry) {
        LRUCacheEntry putIfAbsent = getOrCreateCache(str).putIfAbsent(bytes, lRUCacheEntry);
        maybeEvict(str);
        if (putIfAbsent == null) {
            this.numPuts++;
        }
        return putIfAbsent;
    }

    public void putAll(String str, List<KeyValue<Bytes, LRUCacheEntry>> list) {
        for (KeyValue<Bytes, LRUCacheEntry> keyValue : list) {
            put(str, keyValue.key, keyValue.value);
        }
    }

    public LRUCacheEntry delete(String str, Bytes bytes) {
        NamedCache cache = getCache(str);
        if (cache == null) {
            return null;
        }
        return cache.delete(bytes);
    }

    public MemoryLRUCacheBytesIterator range(String str, Bytes bytes, Bytes bytes2) {
        NamedCache cache = getCache(str);
        return cache == null ? new MemoryLRUCacheBytesIterator(Collections.emptyIterator(), new NamedCache(str, this.metrics)) : new MemoryLRUCacheBytesIterator(cache.keyRange(bytes, bytes2), cache);
    }

    public MemoryLRUCacheBytesIterator reverseRange(String str, Bytes bytes, Bytes bytes2) {
        NamedCache cache = getCache(str);
        return cache == null ? new MemoryLRUCacheBytesIterator(Collections.emptyIterator(), new NamedCache(str, this.metrics)) : new MemoryLRUCacheBytesIterator(cache.reverseKeyRange(bytes, bytes2), cache);
    }

    public MemoryLRUCacheBytesIterator all(String str) {
        NamedCache cache = getCache(str);
        return cache == null ? new MemoryLRUCacheBytesIterator(Collections.emptyIterator(), new NamedCache(str, this.metrics)) : new MemoryLRUCacheBytesIterator(cache.allKeys(), cache);
    }

    public MemoryLRUCacheBytesIterator reverseAll(String str) {
        NamedCache cache = getCache(str);
        return cache == null ? new MemoryLRUCacheBytesIterator(Collections.emptyIterator(), new NamedCache(str, this.metrics)) : new MemoryLRUCacheBytesIterator(cache.reverseAllKeys(), cache);
    }

    public long size() {
        long j = 0;
        Iterator<NamedCache> it = this.caches.values().iterator();
        while (it.hasNext()) {
            j += it.next().size();
            if (isOverflowing(j)) {
                return LongCompanionObject.MAX_VALUE;
            }
        }
        return j;
    }

    private boolean isOverflowing(long j) {
        return j < 0;
    }

    long sizeBytes() {
        long j = 0;
        Iterator<NamedCache> it = this.caches.values().iterator();
        while (it.hasNext()) {
            j += it.next().sizeInBytes();
            if (isOverflowing(j)) {
                return LongCompanionObject.MAX_VALUE;
            }
        }
        return j;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public synchronized void close(String str) {
        NamedCache remove = this.caches.remove(str);
        if (remove != null) {
            remove.close();
        }
    }

    private void maybeEvict(String str) {
        int i = 0;
        while (sizeBytes() > this.maxCacheSizeBytes) {
            NamedCache orCreateCache = getOrCreateCache(str);
            if (orCreateCache.isEmpty()) {
                return;
            }
            orCreateCache.evict();
            this.numEvicts++;
            i++;
        }
        if (this.log.isTraceEnabled()) {
            this.log.trace("Evicted {} entries from cache {}", Integer.valueOf(i), str);
        }
    }

    private synchronized NamedCache getCache(String str) {
        return this.caches.get(str);
    }

    private synchronized NamedCache getOrCreateCache(String str) {
        NamedCache namedCache = this.caches.get(str);
        if (namedCache == null) {
            namedCache = new NamedCache(str, this.metrics);
            this.caches.put(str, namedCache);
        }
        return namedCache;
    }
}
