package io.floodplain.streams.remotejoin;

import io.floodplain.immutable.api.ImmutableMessage;
import io.floodplain.immutable.factory.ImmutableFactory;
import io.floodplain.replication.api.ReplicationMessage;
import java.util.Optional;
import java.util.function.BiFunction;
import java.util.function.Function;
import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.state.KeyValueStore;

/* loaded from: input_file:io/floodplain/streams/remotejoin/ReduceReadProcessor.class */
public class ReduceReadProcessor extends AbstractProcessor<String, ReplicationMessage> {
    private final String accumulatorStoreName;
    private final String inputStoreName;
    private final Function<ImmutableMessage, ImmutableMessage> initial;
    private final Optional<BiFunction<ImmutableMessage, ImmutableMessage, String>> keyExtractor;
    private KeyValueStore<String, ImmutableMessage> accumulatorStore;
    private KeyValueStore<String, ReplicationMessage> inputStore;

    public ReduceReadProcessor(String str, String str2, Function<ImmutableMessage, ImmutableMessage> function, Optional<BiFunction<ImmutableMessage, ImmutableMessage, String>> optional) {
        this.accumulatorStoreName = str2;
        this.inputStoreName = str;
        this.initial = function;
        this.keyExtractor = optional;
    }

    public void init(ProcessorContext processorContext) {
        this.accumulatorStore = processorContext.getStateStore(this.accumulatorStoreName);
        this.inputStore = processorContext.getStateStore(this.inputStoreName);
        super.init(processorContext);
    }

    public void process(String str, ReplicationMessage replicationMessage) {
        String apply;
        ReplicationMessage withParamMessage;
        ReplicationMessage replicationMessage2 = (ReplicationMessage) this.inputStore.get(str);
        if (replicationMessage2 != null) {
            apply = this.keyExtractor.orElse((immutableMessage, immutableMessage2) -> {
                return StoreStateProcessor.COMMONKEY;
            }).apply(replicationMessage2.message(), (ImmutableMessage) replicationMessage2.paramMessage().orElse(ImmutableFactory.empty()));
        } else {
            if (replicationMessage == null || replicationMessage.operation() == ReplicationMessage.Operation.DELETE) {
                throw new RuntimeException("Issue: Deleting (?) a message that isn't there. Is this bad?");
            }
            apply = this.keyExtractor.orElse((immutableMessage3, immutableMessage4) -> {
                return StoreStateProcessor.COMMONKEY;
            }).apply(replicationMessage.message(), (ImmutableMessage) replicationMessage.paramMessage().orElse(ImmutableFactory.empty()));
        }
        ImmutableMessage immutableMessage5 = (ImmutableMessage) this.accumulatorStore.get(apply);
        this.inputStore.put(str, replicationMessage);
        if (replicationMessage != null && replicationMessage.operation() != ReplicationMessage.Operation.DELETE) {
            if (immutableMessage5 == null) {
                immutableMessage5 = this.initial.apply(replicationMessage.message());
            }
            ReplicationMessage withParamMessage2 = replicationMessage.withParamMessage(immutableMessage5);
            if (replicationMessage2 != null) {
                forwardMessage(str, replicationMessage2.withOperation(ReplicationMessage.Operation.DELETE).withParamMessage(immutableMessage5 != null ? immutableMessage5 : this.initial.apply(replicationMessage.message())));
                immutableMessage5 = (ImmutableMessage) this.accumulatorStore.get(apply);
            }
            withParamMessage = withParamMessage2.withParamMessage(immutableMessage5);
        } else {
            if (replicationMessage2 == null) {
                throw new RuntimeException("Issue: Deleting a message that isn't there. Is this bad?");
            }
            withParamMessage = replicationMessage2.withOperation(ReplicationMessage.Operation.DELETE).withParamMessage(immutableMessage5 == null ? this.initial.apply(replicationMessage2.message()) : immutableMessage5);
            this.inputStore.delete(str);
        }
        forwardMessage(str, withParamMessage);
    }

    private void forwardMessage(String str, ReplicationMessage replicationMessage) {
        context().forward(str, replicationMessage);
    }
}
