package io.floodplain.streams.remotejoin;

import io.floodplain.immutable.api.ImmutableMessage;
import io.floodplain.reactive.source.topology.api.TopologyPipeComponent;
import io.floodplain.replication.api.ReplicationMessage;
import io.floodplain.streams.api.Topic;
import io.floodplain.streams.api.TopologyContext;
import io.floodplain.streams.remotejoin.ranged.GroupedUpdateProcessor;
import io.floodplain.streams.remotejoin.ranged.ManyToManyGroupedProcessor;
import io.floodplain.streams.remotejoin.ranged.ManyToOneGroupedProcessor;
import io.floodplain.streams.remotejoin.ranged.OneToManyGroupedProcessor;
import io.floodplain.streams.serializer.ConnectReplicationMessageSerde;
import io.floodplain.streams.serializer.ImmutableMessageSerde;
import io.floodplain.streams.serializer.ReplicationMessageSerde;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Stack;
import java.util.function.BiFunction;
import java.util.function.Function;
import kotlin.jvm.functions.Function3;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.processor.api.ProcessorSupplier;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.Stores;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/floodplain/streams/remotejoin/ReplicationTopologyParser.class */
public class ReplicationTopologyParser {
    public static final String STORE_PREFIX = "STORE_";
    private static final Serde<ReplicationMessage> messageSerde = new ReplicationMessageSerde();
    private static final Serde<ImmutableMessage> immutableMessageSerde = new ImmutableMessageSerde();
    private static final ReplicationMessageSerde replicationMessageSerde = new ReplicationMessageSerde();
    private static final ConnectReplicationMessageSerde connectReplicationMessageSerde = new ConnectReplicationMessageSerde();
    private static final Logger logger = LoggerFactory.getLogger(ReplicationTopologyParser.class);

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: io.floodplain.streams.remotejoin.ReplicationTopologyParser$1, reason: invalid class name */
    /* loaded from: input_file:io/floodplain/streams/remotejoin/ReplicationTopologyParser$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$io$floodplain$streams$api$Topic$FloodplainKeyFormat;
        static final /* synthetic */ int[] $SwitchMap$io$floodplain$streams$api$Topic$FloodplainBodyFormat = new int[Topic.FloodplainBodyFormat.values().length];

        static {
            try {
                $SwitchMap$io$floodplain$streams$api$Topic$FloodplainBodyFormat[Topic.FloodplainBodyFormat.CONNECT_JSON.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$io$floodplain$streams$api$Topic$FloodplainBodyFormat[Topic.FloodplainBodyFormat.FLOODPLAIN_JSON.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            $SwitchMap$io$floodplain$streams$api$Topic$FloodplainKeyFormat = new int[Topic.FloodplainKeyFormat.values().length];
            try {
                $SwitchMap$io$floodplain$streams$api$Topic$FloodplainKeyFormat[Topic.FloodplainKeyFormat.CONNECT_KEY_JSON.ordinal()] = 1;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$io$floodplain$streams$api$Topic$FloodplainKeyFormat[Topic.FloodplainKeyFormat.FLOODPLAIN_STRING.ordinal()] = 2;
            } catch (NoSuchFieldError e4) {
            }
        }
    }

    /* loaded from: input_file:io/floodplain/streams/remotejoin/ReplicationTopologyParser$Flatten.class */
    public enum Flatten {
        FIRST,
        LAST,
        NONE
    }

    private ReplicationTopologyParser() {
    }

    public static void addStateStoreMapping(Map<String, List<String>> map, String str, String str2) {
        logger.info("Adding processor: {} with statestore: {}", str, str2);
        map.computeIfAbsent(str2, str3 -> {
            return new ArrayList();
        }).add(str);
    }

    public static void materializeStateStores(TopologyConstructor topologyConstructor, Topology topology) {
        for (Map.Entry<String, List<String>> entry : topologyConstructor.processorStateStoreMapper.entrySet()) {
            String key = entry.getKey();
            StoreBuilder<KeyValueStore<String, ReplicationMessage>> storeBuilder = topologyConstructor.stateStoreSupplier.get(key);
            if (storeBuilder == null) {
                StoreBuilder<KeyValueStore<String, ImmutableMessage>> storeBuilder2 = topologyConstructor.immutableStoreSupplier.get(key);
                if (storeBuilder2 != null) {
                    topology = topology.addStateStore(storeBuilder2, (String[]) entry.getValue().toArray(new String[0]));
                    logger.info("Added processor: {} with sttstatestores: {} mappings: {}", new Object[]{entry.getKey(), entry.getValue(), topologyConstructor.processorStateStoreMapper.get(entry.getKey())});
                } else {
                    StoreBuilder<KeyValueStore<String, Long>> storeBuilder3 = topologyConstructor.longStoreSupplier.get(key);
                    if (storeBuilder3 == null) {
                        logger.error("Missing supplier for: {}\nStore mappings: {} available suppliers: {}", new Object[]{entry.getKey(), topologyConstructor.processorStateStoreMapper, topologyConstructor.immutableStoreSupplier});
                        logger.error("Available state stores: {}\nimm: {}", topologyConstructor.stateStoreSupplier.keySet(), topologyConstructor.immutableStoreSupplier.keySet());
                        throw new RuntimeException("Missing supplier for: " + entry.getKey());
                    }
                    topology = topology.addStateStore(storeBuilder3, (String[]) entry.getValue().toArray(new String[0]));
                    logger.info("Added processor: {} with sttstatestores: {} mappings: {}", new Object[]{entry.getKey(), entry.getValue(), topologyConstructor.processorStateStoreMapper.get(entry.getKey())});
                }
            } else {
                topology = topology.addStateStore(storeBuilder, (String[]) entry.getValue().toArray(new String[0]));
                logger.info("Added processor: {} with sttstatestores: {} mappings: {}", new Object[]{entry.getKey(), entry.getValue(), topologyConstructor.processorStateStoreMapper.get(entry.getKey())});
            }
        }
    }

    public static void addDiffProcessor(Topology topology, TopologyContext topologyContext, TopologyConstructor topologyConstructor, String str, String str2) {
        topology.addProcessor(str2, () -> {
            return new DiffProcessor(str2);
        }, new String[]{str});
        addStateStoreMapping(topologyConstructor.processorStateStoreMapper, str2, str2);
        logger.info("Granting access for processor: {} to store: {}", str2, str2);
        topologyConstructor.stateStoreSupplier.put(str2, createMessageStoreSupplier(str2, true));
    }

    public static void addCompareToProcessor(Topology topology, TopologyContext topologyContext, TopologyConstructor topologyConstructor, String str, String str2, Function3<String, ImmutableMessage, ImmutableMessage, ImmutableMessage> function3) {
        topology.addProcessor(str2, () -> {
            return new CompareToProcessor(str2, function3);
        }, new String[]{str});
        addStateStoreMapping(topologyConstructor.processorStateStoreMapper, str2, str2);
        logger.info("Granting access for processor: {} to store: {}", str2, str2);
        topologyConstructor.stateStoreSupplier.put(str2, createMessageStoreSupplier(str2, true));
    }

    public static String addMaterializeStore(Topology topology, TopologyContext topologyContext, TopologyConstructor topologyConstructor, String str, String str2) {
        topology.addProcessor(str, () -> {
            return new StoreProcessor("STORE_" + str);
        }, new String[]{str2});
        addStateStoreMapping(topologyConstructor.processorStateStoreMapper, str, "STORE_" + str);
        topologyConstructor.stores.add("STORE_" + str);
        topologyConstructor.stateStoreSupplier.put("STORE_" + str, createMessageStoreSupplier("STORE_" + str, true));
        return str;
    }

    public static Deserializer<String> keyDeserializer(Topic.FloodplainKeyFormat floodplainKeyFormat) {
        switch (AnonymousClass1.$SwitchMap$io$floodplain$streams$api$Topic$FloodplainKeyFormat[floodplainKeyFormat.ordinal()]) {
            case 1:
                return ConnectReplicationMessageSerde.keyDeserialize();
            case 2:
                return Serdes.String().deserializer();
            default:
                throw new IllegalArgumentException("Weird key format: " + floodplainKeyFormat);
        }
    }

    public static Serializer<String> keySerializer(Topic.FloodplainKeyFormat floodplainKeyFormat) {
        switch (AnonymousClass1.$SwitchMap$io$floodplain$streams$api$Topic$FloodplainKeyFormat[floodplainKeyFormat.ordinal()]) {
            case 1:
                return ConnectReplicationMessageSerde.keySerialize();
            case 2:
                return Serdes.String().serializer();
            default:
                throw new IllegalArgumentException("Weird key format: " + floodplainKeyFormat);
        }
    }

    public static Deserializer<ReplicationMessage> bodyDeserializer(Topic.FloodplainBodyFormat floodplainBodyFormat) {
        switch (AnonymousClass1.$SwitchMap$io$floodplain$streams$api$Topic$FloodplainBodyFormat[floodplainBodyFormat.ordinal()]) {
            case 1:
                return connectReplicationMessageSerde.deserializer();
            case 2:
                return replicationMessageSerde.deserializer();
            default:
                throw new IllegalArgumentException("Weird body format: " + floodplainBodyFormat);
        }
    }

    public static Serializer<ReplicationMessage> bodySerializer(Topic.FloodplainBodyFormat floodplainBodyFormat) {
        switch (AnonymousClass1.$SwitchMap$io$floodplain$streams$api$Topic$FloodplainBodyFormat[floodplainBodyFormat.ordinal()]) {
            case 1:
                return connectReplicationMessageSerde.serializer();
            case 2:
                return replicationMessageSerde.serializer();
            default:
                throw new IllegalArgumentException("Weird body format: " + floodplainBodyFormat);
        }
    }

    public static String addSourceStore(Topology topology, TopologyConstructor topologyConstructor, Topic topic, Topic.FloodplainKeyFormat floodplainKeyFormat, Topic.FloodplainBodyFormat floodplainBodyFormat, boolean z) {
        return addSourceStore(topology, topologyConstructor, topic, keyDeserializer(floodplainKeyFormat), bodyDeserializer(floodplainBodyFormat), z);
    }

    public static String addSourceStore(Topology topology, TopologyConstructor topologyConstructor, Topic topic, Deserializer<String> deserializer, Deserializer<ReplicationMessage> deserializer2, boolean z) {
        String prefixedString = topic.prefixedString("SOURCE");
        if (!topologyConstructor.sources.containsKey(topic)) {
            String str = prefixedString + "_src";
            topology.addSource(str, deserializer, deserializer2, new String[]{topic.qualifiedString()});
            topologyConstructor.sources.put(topic, str);
            if (z) {
                topology.addProcessor(prefixedString, () -> {
                    return new StoreProcessor("STORE_" + prefixedString);
                }, new String[]{str});
            } else {
                topology.addProcessor(prefixedString, IdentityProcessor::new, new String[]{str});
            }
        }
        if (z) {
            addStateStoreMapping(topologyConstructor.processorStateStoreMapper, prefixedString, "STORE_" + prefixedString);
            topologyConstructor.stores.add("STORE_" + prefixedString);
            topologyConstructor.stateStoreSupplier.put("STORE_" + prefixedString, createMessageStoreSupplier("STORE_" + prefixedString, true));
        }
        logger.info("Granting access for processor: {} to store: {}", prefixedString, prefixedString);
        return prefixedString;
    }

    public static void addSingleJoinGrouped(Topology topology, TopologyContext topologyContext, TopologyConstructor topologyConstructor, String str, String str2, String str3, boolean z, boolean z2, boolean z3) {
        String str4 = str2 + "-forwardpre";
        String str5 = str2 + "-reversepre";
        String str6 = str2 + "-joined";
        topology.addProcessor(str4, () -> {
            return new PreJoinProcessor(false);
        }, new String[]{str}).addProcessor(str5, () -> {
            return new PreJoinProcessor(true);
        }, new String[]{str3}).addProcessor(str6, !z3 ? () -> {
            return new ManyToOneGroupedProcessor(str, str3, z);
        } : () -> {
            return new ManyToManyGroupedProcessor(str, str3, z);
        }, new String[]{str4, str5});
        addStateStoreMapping(topologyConstructor.processorStateStoreMapper, str6, "STORE_" + str3);
        addStateStoreMapping(topologyConstructor.processorStateStoreMapper, str6, "STORE_" + str);
        addStateStoreMapping(topologyConstructor.processorStateStoreMapper, str2, "STORE_" + str2);
        topologyConstructor.stores.add("STORE_" + str3);
        topologyConstructor.stores.add("STORE_" + str);
        topologyConstructor.stores.add("STORE_" + str2);
        topologyConstructor.stateStoreSupplier.put("STORE_" + str2, createMessageStoreSupplier("STORE_" + str2, true));
        topology.addProcessor(str2, () -> {
            return new StoreProcessor("STORE_" + str2);
        }, new String[]{str6});
    }

    public static String addGroupedProcessor(Topology topology, TopologyContext topologyContext, TopologyConstructor topologyConstructor, String str, String str2, Function<ReplicationMessage, String> function) {
        if (!topologyConstructor.stores.contains("STORE_" + str2)) {
            logger.error("Adding grouped with from, no source processor present for: " + str2 + " created: " + topologyConstructor.stateStoreSupplier.keySet() + " and from: " + str2);
        }
        String str3 = str2 + "_mapping";
        addStateStoreMapping(topologyConstructor.processorStateStoreMapper, str, "STORE_" + str);
        topologyConstructor.stores.add("STORE_" + str);
        addStateStoreMapping(topologyConstructor.processorStateStoreMapper, str, "STORE_" + str3);
        topologyConstructor.stores.add("STORE_" + str3);
        topologyConstructor.stateStoreSupplier.put("STORE_" + str, createMessageStoreSupplier("STORE_" + str, true));
        topologyConstructor.stateStoreSupplier.put("STORE_" + str3, createMessageStoreSupplier("STORE_" + str3, true));
        topology.addProcessor(str, () -> {
            return new GroupedUpdateProcessor("STORE_" + str, function, "STORE_" + str3);
        }, new String[]{str2});
        return str;
    }

    public static void addPersistentCache(Topology topology, TopologyContext topologyContext, TopologyConstructor topologyConstructor, String str, String str2, Duration duration, int i, boolean z) {
        topology.addProcessor(str, () -> {
            return new CacheProcessor(str, duration, i, z);
        }, new String[]{str2});
        logger.info("Buffer using statestore: {}", "STORE_" + str);
        addStateStoreMapping(topologyConstructor.processorStateStoreMapper, str, "STORE_" + str);
        topologyConstructor.stateStoreSupplier.put("STORE_" + str, createMessageStoreSupplier("STORE_" + str, true));
    }

    public static String addReducer(Topology topology, TopologyContext topologyContext, TopologyConstructor topologyConstructor, Stack<String> stack, int i, List<TopologyPipeComponent> list, List<TopologyPipeComponent> list2, Function<ImmutableMessage, ImmutableMessage> function, boolean z, Optional<BiFunction<ImmutableMessage, ImmutableMessage, String>> optional) {
        String peek = stack.peek();
        String qualifiedName = topologyContext.qualifiedName("reduce", stack.size(), i);
        stack.push(qualifiedName);
        String qualifiedName2 = topologyContext.qualifiedName("ifelse", stack.size(), i);
        stack.push(qualifiedName2);
        int generateNewStreamId = topologyConstructor.generateNewStreamId();
        int generateNewStreamId2 = topologyConstructor.generateNewStreamId();
        String qualifiedName3 = topologyContext.qualifiedName("addbranch", stack.size(), i);
        String qualifiedName4 = topologyContext.qualifiedName("removeBranch", stack.size(), i);
        String qualifiedName5 = topologyContext.qualifiedName("reduce", stack.size(), i);
        String str = "STORE_accumulator_" + qualifiedName5;
        String str2 = "STORE_reduce_inputstore_" + qualifiedName5;
        topology.addProcessor(qualifiedName, () -> {
            return new ReduceReadProcessor(str2, str, function, optional);
        }, new String[]{peek});
        topology.addProcessor(qualifiedName2, () -> {
            return new IfElseProcessor(replicationMessage -> {
                return replicationMessage.operation() != ReplicationMessage.Operation.DELETE;
            }, qualifiedName3, Optional.of(qualifiedName4));
        }, new String[]{qualifiedName});
        Stack<String> stack2 = new Stack<>();
        stack2.addAll(stack);
        topology.addProcessor(qualifiedName3, IdentityProcessor::new, new String[]{stack2.peek()});
        stack2.push(qualifiedName3);
        Stack<String> stack3 = new Stack<>();
        stack3.addAll(stack);
        topology.addProcessor(qualifiedName4, IdentityProcessor::new, new String[]{stack3.peek()});
        stack3.push(qualifiedName4);
        Iterator<TopologyPipeComponent> it = list.iterator();
        while (it.hasNext()) {
            it.next().addToTopology(stack2, generateNewStreamId, topology, topologyContext, topologyConstructor);
        }
        String qualifiedName6 = topologyContext.qualifiedName("primToSecondaryAdd", stack.size(), i);
        topology.addProcessor(qualifiedName6, PrimaryToSecondaryProcessor::new, new String[]{stack2.peek()});
        stack2.push(qualifiedName6);
        Iterator<TopologyPipeComponent> it2 = list2.iterator();
        while (it2.hasNext()) {
            it2.next().addToTopology(stack3, generateNewStreamId2, topology, topologyContext, topologyConstructor);
        }
        String qualifiedName7 = topologyContext.qualifiedName("primToSecondaryRemove", stack.size(), i);
        topology.addProcessor(qualifiedName7, PrimaryToSecondaryProcessor::new, new String[]{stack3.peek()});
        stack3.push(qualifiedName7);
        topology.addProcessor(z ? "_proc" + qualifiedName5 : qualifiedName5, () -> {
            return new StoreStateProcessor(str);
        }, new String[]{stack2.peek(), stack3.peek()});
        addStateStoreMapping(topologyConstructor.processorStateStoreMapper, z ? "_proc" + qualifiedName5 : qualifiedName5, str);
        addStateStoreMapping(topologyConstructor.processorStateStoreMapper, qualifiedName, str);
        addStateStoreMapping(topologyConstructor.processorStateStoreMapper, qualifiedName, str2);
        if (!topologyConstructor.immutableStoreSupplier.containsKey(str)) {
            topologyConstructor.immutableStoreSupplier.put(str, createImmutableMessageSupplier(str, true));
        }
        if (!topologyConstructor.stateStoreSupplier.containsKey(str2)) {
            topologyConstructor.stateStoreSupplier.put(str2, createMessageStoreSupplier(str2, true));
        }
        if (z) {
            addMaterializeStore(topology, topologyContext, topologyConstructor, qualifiedName5, "_proc" + qualifiedName5);
        }
        return qualifiedName5;
    }

    public static void addJoin(Topology topology, TopologyContext topologyContext, TopologyConstructor topologyConstructor, String str, String str2, String str3, boolean z, boolean z2, boolean z3, boolean z4) {
        String str4 = str3 + "-forwardpre";
        String str5 = str3 + "-reversepre";
        topology.addProcessor(str4, () -> {
            return new PreJoinProcessor(false);
        }, new String[]{str}).addProcessor(str5, () -> {
            return new PreJoinProcessor(true);
        }, new String[]{str2});
        ProcessorSupplier processorSupplier = z2 ? () -> {
            return new OneToManyGroupedProcessor("STORE_" + str, "STORE_" + str2, z, z4);
        } : () -> {
            return new OneToOneProcessor("STORE_" + str, "STORE_" + str2, z, (replicationMessage, replicationMessage2) -> {
                return replicationMessage.withParamMessage(replicationMessage2.message());
            });
        };
        String str6 = z3 ? "proc_" + str3 : str3;
        topology.addProcessor(str6, processorSupplier, new String[]{str4, str5});
        addStateStoreMapping(topologyConstructor.processorStateStoreMapper, str6, "STORE_" + str2);
        addStateStoreMapping(topologyConstructor.processorStateStoreMapper, str6, "STORE_" + str);
        if (z3) {
            topologyConstructor.stores.add("STORE_" + str3);
            topologyConstructor.stateStoreSupplier.put("STORE_" + str3, createMessageStoreSupplier("STORE_" + str3, true));
            addStateStoreMapping(topologyConstructor.processorStateStoreMapper, str3, "STORE_" + str3);
            topology.addProcessor(str3, () -> {
                return new StoreProcessor("STORE_" + str3);
            }, new String[]{str6});
        }
    }

    public static StoreBuilder<KeyValueStore<String, ReplicationMessage>> createMessageStoreSupplier(String str, boolean z) {
        if (!z) {
            logger.info("Creating non-persistent messagestore supplier: {}", str);
        }
        return Stores.keyValueStoreBuilder(z ? Stores.persistentKeyValueStore(str) : Stores.inMemoryKeyValueStore(str), Serdes.String(), messageSerde);
    }

    public static StoreBuilder<KeyValueStore<String, ImmutableMessage>> createImmutableMessageSupplier(String str, boolean z) {
        if (!z) {
            logger.info("Creating non-persistent messagestore supplier: {}", str);
        }
        return Stores.keyValueStoreBuilder(z ? Stores.persistentKeyValueStore(str) : Stores.inMemoryKeyValueStore(str), Serdes.String(), immutableMessageSerde);
    }

    public static StoreBuilder<KeyValueStore<String, Long>> createLongStoreSupplier(String str, boolean z) {
        if (!z) {
            logger.info("Creating non-persistent messagestore supplier: {}", str);
        }
        return Stores.keyValueStoreBuilder(z ? Stores.persistentKeyValueStore(str) : Stores.inMemoryKeyValueStore(str), Serdes.String(), Serdes.Long());
    }
}
