package io.floodplain.streams.remotejoin;

import io.floodplain.immutable.api.ImmutableMessage;
import io.floodplain.replication.api.ReplicationMessage;
import java.time.Duration;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.processor.PunctuationType;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/floodplain/streams/remotejoin/CacheProcessor.class */
public class CacheProcessor implements Processor<String, ReplicationMessage, String, ReplicationMessage> {
    private static final String CACHED_AT_KEY = "_cachedAt";
    private static final Logger logger = LoggerFactory.getLogger(CacheProcessor.class);
    private KeyValueStore<String, ReplicationMessage> lookupStore;
    private ProcessorContext<String, ReplicationMessage> context;
    private final Duration cacheTime;
    private final String cacheProcName;
    private final boolean memoryCache;
    private final int maxSize;
    private final Object sync = new Object();
    private boolean clearPersistentCache = false;
    private final Map<String, Record<String, ReplicationMessage>> cache = new ConcurrentHashMap();

    public CacheProcessor(String str, Duration duration, int i, boolean z) {
        this.cacheProcName = str;
        this.cacheTime = duration;
        this.memoryCache = z;
        this.maxSize = i;
        logger.info("Using a cache time of {} seconds for {}", duration, str);
    }

    public void init(ProcessorContext processorContext) {
        this.context = processorContext;
        this.lookupStore = processorContext.getStateStore("STORE_" + this.cacheProcName);
        long max = Math.max(this.cacheTime.toMillis() / 10, 1000L);
        this.context.schedule(Duration.ofMillis(max), PunctuationType.WALL_CLOCK_TIME, this::checkCache);
        logger.info("Created persistentCache for {} with check interval of {}ms", this.cacheProcName, Long.valueOf(max));
        if (this.memoryCache) {
            this.clearPersistentCache = true;
        }
    }

    public void process(Record<String, ReplicationMessage> record) {
        String str = (String) record.key();
        ReplicationMessage replicationMessage = (ReplicationMessage) record.value();
        synchronized (this.sync) {
            if (replicationMessage != null) {
                if (replicationMessage.operation() != ReplicationMessage.Operation.DELETE) {
                    if (!this.memoryCache) {
                        this.lookupStore.put(str, replicationMessage.with(CACHED_AT_KEY, Long.valueOf(record.timestamp()), ImmutableMessage.ValueType.LONG));
                    } else if (this.cache.size() > this.maxSize) {
                        logger.warn("Reached max cache size!");
                        this.context.forward(record);
                    } else {
                        this.cache.put(str, record);
                    }
                }
            }
            this.cache.remove(str);
            this.lookupStore.delete(str);
            this.context.forward(record);
        }
    }

    public void close() {
        synchronized (this.sync) {
            for (Map.Entry<String, Record<String, ReplicationMessage>> entry : this.cache.entrySet()) {
                this.context.forward(entry.getValue());
                this.cache.remove(entry.getKey());
            }
        }
    }

    public void checkCache(long j) {
        if (this.clearPersistentCache) {
            clearPersistentCache();
        }
        int i = 0;
        int i2 = 0;
        if (this.memoryCache) {
            HashSet<String> hashSet = new HashSet();
            for (Map.Entry<String, Record<String, ReplicationMessage>> entry : this.cache.entrySet()) {
                if (isExpired(j, entry.getValue())) {
                    hashSet.add(entry.getKey());
                }
            }
            synchronized (this.sync) {
                for (String str : hashSet) {
                    if (this.cache.get(str) != null) {
                        this.context.forward(this.cache.get(str));
                        this.cache.remove(str);
                    }
                }
            }
        } else {
            HashSet<String> hashSet2 = new HashSet();
            KeyValueIterator all = this.lookupStore.all();
            while (all.hasNext()) {
                KeyValue keyValue = (KeyValue) all.next();
                i++;
                if (j - ((Long) ((ReplicationMessage) keyValue.value).value(CACHED_AT_KEY).orElse(0L)).longValue() >= this.cacheTime.toMillis()) {
                    hashSet2.add((String) keyValue.key);
                }
            }
            synchronized (this.sync) {
                for (String str2 : hashSet2) {
                    ReplicationMessage replicationMessage = (ReplicationMessage) this.lookupStore.get(str2);
                    if (replicationMessage != null) {
                        long longValue = ((Long) replicationMessage.value(CACHED_AT_KEY).orElse(0L)).longValue();
                        if (j - longValue >= this.cacheTime.toMillis()) {
                            i2++;
                            this.context.forward(new Record(str2, replicationMessage.without(CACHED_AT_KEY), longValue));
                            this.lookupStore.delete(str2);
                        }
                    }
                }
            }
        }
        long currentTimeMillis = System.currentTimeMillis() - j;
        if (i > 0) {
            logger.info("Checked cache {} - {} entries, {} expired entries in {}ms", new Object[]{this.cacheProcName, Integer.valueOf(i), Integer.valueOf(i2), Long.valueOf(currentTimeMillis)});
        }
    }

    private void clearPersistentCache() {
        HashSet hashSet = new HashSet();
        KeyValueIterator all = this.lookupStore.all();
        while (all.hasNext()) {
            KeyValue keyValue = (KeyValue) all.next();
            this.context.forward(new Record((String) keyValue.key, ((ReplicationMessage) keyValue.value).without(CACHED_AT_KEY), ((ReplicationMessage) keyValue.value).timestamp()));
            hashSet.add((String) keyValue.key);
        }
        Iterator it = hashSet.iterator();
        while (it.hasNext()) {
            this.lookupStore.delete((String) it.next());
        }
        this.clearPersistentCache = false;
    }

    private boolean isExpired(long j, Record<String, ReplicationMessage> record) {
        return j - record.timestamp() > this.cacheTime.toMillis();
    }
}
