package io.floodplain.streams.remotejoin;

import io.floodplain.replication.api.ReplicationMessage;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.state.KeyValueStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/floodplain/streams/remotejoin/CompareToProcessor.class */
public class CompareToProcessor implements Processor<String, ReplicationMessage, String, ReplicationMessage> {
    private final String lookupStoreName;
    private KeyValueStore<String, ReplicationMessage> lookupStore;
    private static final Logger logger = LoggerFactory.getLogger(CompareToProcessor.class);
    private ProcessorContext<String, ReplicationMessage> context;

    public CompareToProcessor(String str) {
        this.lookupStoreName = str;
    }

    public void init(ProcessorContext processorContext) {
        this.context = processorContext;
        this.lookupStore = processorContext.getStateStore(this.lookupStoreName);
    }

    public void process(Record<String, ReplicationMessage> record) {
        ReplicationMessage replicationMessage = (ReplicationMessage) record.value();
        String str = (String) record.key();
        if (replicationMessage != null && replicationMessage.operation() != ReplicationMessage.Operation.DELETE) {
            ReplicationMessage replicationMessage2 = (ReplicationMessage) this.lookupStore.get(str);
            if (replicationMessage2 != null) {
                this.context.forward(record.withValue(((ReplicationMessage) record.value()).withParamMessage(replicationMessage2.message())));
            } else {
                this.context.forward(record);
            }
            this.lookupStore.put(str, replicationMessage);
            return;
        }
        logger.debug("Delete detected in store: {} with key: {}", this.lookupStoreName, str);
        ReplicationMessage replicationMessage3 = (ReplicationMessage) this.lookupStore.get(str);
        if (replicationMessage3 != null) {
            this.lookupStore.delete(str);
            this.context.forward(record.withValue(replicationMessage3.withOperation(ReplicationMessage.Operation.DELETE)));
        }
    }

    public void close() {
    }
}
