package io.floodplain.streams.remotejoin;

import io.floodplain.replication.api.ReplicationMessage;
import io.floodplain.replication.factory.ReplicationFactory;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/floodplain/streams/remotejoin/HistoryProcessor.class */
public class HistoryProcessor implements Processor<String, ReplicationMessage, String, ReplicationMessage> {
    private final String lookupStoreName;
    private final String keyCounterStoreName;
    private ProcessorContext<String, ReplicationMessage> context;
    private static final Logger logger = LoggerFactory.getLogger(HistoryProcessor.class);
    private KeyValueStore<String, ReplicationMessage> lookupStore;
    private KeyValueStore<String, Long> keyCountStore;

    public HistoryProcessor(String str, String str2) {
        this.lookupStoreName = str;
        this.keyCounterStoreName = str2;
    }

    public void init(ProcessorContext<String, ReplicationMessage> processorContext) {
        this.context = processorContext;
        this.lookupStore = processorContext.getStateStore(this.lookupStoreName);
        this.keyCountStore = processorContext.getStateStore(this.keyCounterStoreName);
    }

    public void process(Record<String, ReplicationMessage> record) {
        Long valueOf;
        String str = (String) record.key();
        if (record.value() == null) {
            processDelete(str);
            return;
        }
        if (((ReplicationMessage) record.value()).operation() == ReplicationMessage.Operation.DELETE) {
            processDelete(str);
            return;
        }
        Long l = (Long) this.keyCountStore.get(str);
        if (l == null) {
            valueOf = 0L;
            this.keyCountStore.put(str, (Object) null);
        } else {
            valueOf = Long.valueOf(l.longValue() + 1);
        }
        this.lookupStore.put(str + "|" + String.format("%08d", valueOf), (ReplicationMessage) record.value());
        forwardHistory(str);
    }

    private void processDelete(String str) {
        ArrayList arrayList = new ArrayList();
        KeyValueIterator range = this.lookupStore.range(str + "|", str + "}");
        while (range.hasNext()) {
            try {
                arrayList.add((String) ((KeyValue) range.next()).key);
            } catch (Throwable th) {
                if (range != null) {
                    try {
                        range.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        }
        if (range != null) {
            range.close();
        }
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            this.lookupStore.delete((String) it.next());
        }
        ReplicationMessage withSubMessages = ReplicationFactory.empty().withSubMessages("list", Collections.emptyList());
        this.context.forward(new Record(str, withSubMessages, withSubMessages.timestamp()));
    }

    private void forwardHistory(String str) {
        logger.warn("whoop");
        ArrayList arrayList = new ArrayList();
        KeyValueIterator range = this.lookupStore.range(str + "|", str + "}");
        while (range.hasNext()) {
            try {
                arrayList.add(((ReplicationMessage) ((KeyValue) range.next()).value).message());
            } catch (Throwable th) {
                if (range != null) {
                    try {
                        range.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        }
        if (range != null) {
            range.close();
        }
        ReplicationMessage withSubMessages = ReplicationFactory.empty().withSubMessages("list", arrayList);
        this.context.forward(new Record(str, withSubMessages, withSubMessages.timestamp()));
    }
}
