package org.apache.kafka.streams.integration;

import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.streams.TestOutputTopic;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.apache.kafka.streams.utils.UniqueTopicSerdeScope;
import org.apache.kafka.test.TestUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;

@Tag("integration")
@Timeout(600)
/* loaded from: input_file:org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.class */
public class KTableKTableForeignKeyJoinIntegrationTest {
    protected static final String LEFT_TABLE = "left_table";
    protected static final String RIGHT_TABLE = "right_table";
    protected static final String OUTPUT = "output-topic";
    private static final String REJOIN_OUTPUT = "rejoin-output-topic";
    private final MockTime time = new MockTime();
    protected long baseTimestamp;

    @BeforeEach
    public void before() {
        this.baseTimestamp = this.time.milliseconds();
    }

    private static Properties getStreamsProperties(String str) {
        return Utils.mkProperties(Utils.mkMap(new Map.Entry[]{Utils.mkEntry("state.dir", TestUtils.tempDirectory().getPath()), Utils.mkEntry("topology.optimization", str)}));
    }

    private static Collection<Object[]> data() {
        return buildParameters(Arrays.asList(true, false), Arrays.asList("all", "none"), Arrays.asList(true, false), Arrays.asList(true, false), Collections.singletonList(false), Collections.singletonList(false));
    }

    private static Collection<Object[]> versionedData() {
        return buildParameters(Arrays.asList(true, false), Collections.singletonList("none"), Arrays.asList(true, false), Collections.singletonList(false), Arrays.asList(true, false), Arrays.asList(true, false));
    }

    private static Stream<Arguments> testCases() {
        return ((Set) Stream.concat(data().stream().map(Arrays::asList), versionedData().stream().map(Arrays::asList)).collect(Collectors.toSet())).stream().map(list -> {
            return Arguments.of(list.toArray());
        });
    }

    private static Stream<Arguments> testCasesWithoutLeftJoinArg() {
        return ((Set) testCases().map(arguments -> {
            return Arrays.asList(Arrays.copyOfRange(arguments.get(), 1, arguments.get().length));
        }).collect(Collectors.toSet())).stream().map(list -> {
            return Arguments.of(list.toArray());
        });
    }

    private static Stream<Arguments> versionedDataTestCases() {
        return versionedData().stream().map(Arguments::of);
    }

    /* JADX WARN: Multi-variable type inference failed */
    protected static Collection<Object[]> buildParameters(List<?>... listArr) {
        List linkedList = new LinkedList();
        linkedList.add(new Object[0]);
        for (List<?> list : listArr) {
            linkedList = times(linkedList, list);
        }
        return linkedList;
    }

    private static List<Object[]> times(List<Object[]> list, List<?> list2) {
        LinkedList linkedList = new LinkedList();
        for (Object[] objArr : list) {
            for (Object obj : list2) {
                Object[] objArr2 = new Object[objArr.length + 1];
                System.arraycopy(objArr, 0, objArr2, 0, objArr.length);
                objArr2[objArr.length] = obj;
                linkedList.add(objArr2);
            }
        }
        return linkedList;
    }

    @MethodSource({"testCases"})
    @ParameterizedTest
    public void doJoinFromLeftThenDeleteLeftEntity(boolean z, String str, boolean z2, boolean z3, boolean z4, boolean z5) {
        Properties streamsProperties = getStreamsProperties(str);
        TopologyTestDriver topologyTestDriver = new TopologyTestDriver(getTopology(streamsProperties, z2 ? "store" : null, z, z3, z4, z5), streamsProperties);
        Throwable th = null;
        try {
            try {
                TestInputTopic createInputTopic = topologyTestDriver.createInputTopic(RIGHT_TABLE, new StringSerializer(), new StringSerializer());
                TestInputTopic createInputTopic2 = topologyTestDriver.createInputTopic(LEFT_TABLE, new StringSerializer(), new StringSerializer());
                TestOutputTopic createOutputTopic = topologyTestDriver.createOutputTopic(OUTPUT, new StringDeserializer(), new StringDeserializer());
                TestOutputTopic createOutputTopic2 = z3 ? topologyTestDriver.createOutputTopic(REJOIN_OUTPUT, new StringDeserializer(), new StringDeserializer()) : null;
                KeyValueStore keyValueStore = topologyTestDriver.getKeyValueStore("store");
                createInputTopic.pipeInput("rhs1", "rhsValue1", this.baseTimestamp);
                createInputTopic.pipeInput("rhs2", "rhsValue2", this.baseTimestamp + 1);
                createInputTopic.pipeInput("rhs3", "rhsValue3", this.baseTimestamp + 2);
                MatcherAssert.assertThat(createOutputTopic.readKeyValuesToList(), CoreMatchers.is(Collections.emptyList()));
                if (z3) {
                    MatcherAssert.assertThat(createOutputTopic2.readKeyValuesToList(), CoreMatchers.is(Collections.emptyList()));
                }
                if (z2) {
                    MatcherAssert.assertThat(asMap(keyValueStore), CoreMatchers.is(Collections.emptyMap()));
                }
                createInputTopic2.pipeInput("lhs1", "lhsValue1|rhs1", this.baseTimestamp + 3);
                createInputTopic2.pipeInput("lhs2", "lhsValue2|rhs2", this.baseTimestamp + 4);
                List asList = Arrays.asList(KeyValue.pair("lhs1", "(lhsValue1|rhs1,rhsValue1)"), KeyValue.pair("lhs2", "(lhsValue2|rhs2,rhsValue2)"));
                MatcherAssert.assertThat(createOutputTopic.readKeyValuesToList(), CoreMatchers.is(asList));
                if (z3) {
                    MatcherAssert.assertThat(createOutputTopic2.readKeyValuesToList(), CoreMatchers.is(Arrays.asList(KeyValue.pair("lhs1", "rejoin((lhsValue1|rhs1,rhsValue1),lhsValue1|rhs1)"), KeyValue.pair("lhs2", "rejoin((lhsValue2|rhs2,rhsValue2),lhsValue2|rhs2)"))));
                }
                if (z2) {
                    MatcherAssert.assertThat(asMap(keyValueStore), CoreMatchers.is(asList.stream().collect(Collectors.toMap(keyValue -> {
                        return (String) keyValue.key;
                    }, keyValue2 -> {
                        return (String) keyValue2.value;
                    }))));
                }
                createInputTopic2.pipeInput("lhs3", "lhsValue3|rhs1", this.baseTimestamp + 5);
                MatcherAssert.assertThat(createOutputTopic.readKeyValuesToList(), CoreMatchers.is(Arrays.asList(new KeyValue("lhs3", "(lhsValue3|rhs1,rhsValue1)"))));
                if (z3) {
                    MatcherAssert.assertThat(createOutputTopic2.readKeyValuesToList(), CoreMatchers.is(Arrays.asList(new KeyValue("lhs3", "rejoin((lhsValue3|rhs1,rhsValue1),lhsValue3|rhs1)"))));
                }
                if (z2) {
                    MatcherAssert.assertThat(asMap(keyValueStore), CoreMatchers.is(Utils.mkMap(new Map.Entry[]{Utils.mkEntry("lhs1", "(lhsValue1|rhs1,rhsValue1)"), Utils.mkEntry("lhs2", "(lhsValue2|rhs2,rhsValue2)"), Utils.mkEntry("lhs3", "(lhsValue3|rhs1,rhsValue1)")})));
                }
                createInputTopic2.pipeInput("lhs1", (String) null, this.baseTimestamp + 6);
                MatcherAssert.assertThat(createOutputTopic.readKeyValuesToList(), CoreMatchers.is(Arrays.asList(new KeyValue("lhs1", (Object) null))));
                if (z3) {
                    MatcherAssert.assertThat(createOutputTopic2.readKeyValuesToList(), CoreMatchers.hasItem(KeyValue.pair("lhs1", (Object) null)));
                }
                if (z2) {
                    MatcherAssert.assertThat(asMap(keyValueStore), CoreMatchers.is(Utils.mkMap(new Map.Entry[]{Utils.mkEntry("lhs2", "(lhsValue2|rhs2,rhsValue2)"), Utils.mkEntry("lhs3", "(lhsValue3|rhs1,rhsValue1)")})));
                }
                if (topologyTestDriver != null) {
                    if (0 == 0) {
                        topologyTestDriver.close();
                        return;
                    }
                    try {
                        topologyTestDriver.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (topologyTestDriver != null) {
                if (th != null) {
                    try {
                        topologyTestDriver.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    topologyTestDriver.close();
                }
            }
            throw th4;
        }
    }

    @MethodSource({"testCases"})
    @ParameterizedTest
    public void doJoinFromLeftThenUpdateFkThenRevertBack(boolean z, String str, boolean z2, boolean z3, boolean z4, boolean z5) {
        Properties streamsProperties = getStreamsProperties(str);
        TopologyTestDriver topologyTestDriver = new TopologyTestDriver(getTopology(streamsProperties, z2 ? "store" : null, z, z3, z4, z5), streamsProperties);
        Throwable th = null;
        try {
            try {
                TestInputTopic createInputTopic = topologyTestDriver.createInputTopic(RIGHT_TABLE, new StringSerializer(), new StringSerializer());
                TestInputTopic createInputTopic2 = topologyTestDriver.createInputTopic(LEFT_TABLE, new StringSerializer(), new StringSerializer());
                TestOutputTopic createOutputTopic = topologyTestDriver.createOutputTopic(OUTPUT, new StringDeserializer(), new StringDeserializer());
                TestOutputTopic createOutputTopic2 = z3 ? topologyTestDriver.createOutputTopic(REJOIN_OUTPUT, new StringDeserializer(), new StringDeserializer()) : null;
                KeyValueStore keyValueStore = topologyTestDriver.getKeyValueStore("store");
                createInputTopic.pipeInput("rhs1", "rhsValue1", this.baseTimestamp);
                createInputTopic.pipeInput("rhs2", "rhsValue2", this.baseTimestamp + 1);
                MatcherAssert.assertThat(createOutputTopic.readKeyValuesToList(), CoreMatchers.is(Collections.emptyList()));
                if (z3) {
                    MatcherAssert.assertThat(createOutputTopic2.readKeyValuesToList(), CoreMatchers.is(Collections.emptyList()));
                }
                if (z2) {
                    MatcherAssert.assertThat(asMap(keyValueStore), CoreMatchers.is(Collections.emptyMap()));
                }
                createInputTopic2.pipeInput("lhs1", "lhsValue1|rhs1", this.baseTimestamp + 3);
                MatcherAssert.assertThat(createOutputTopic.readKeyValuesToList(), CoreMatchers.is(Arrays.asList(KeyValue.pair("lhs1", "(lhsValue1|rhs1,rhsValue1)"))));
                createInputTopic2.pipeInput("lhs1", "lhsValue1|rhs2", this.baseTimestamp + 5);
                MatcherAssert.assertThat(createOutputTopic.readKeyValuesToList(), CoreMatchers.is(Arrays.asList(new KeyValue("lhs1", "(lhsValue1|rhs2,rhsValue2)"))));
                createInputTopic2.pipeInput("lhs1", "lhsValue1|rhs1", this.baseTimestamp + 6);
                MatcherAssert.assertThat(createOutputTopic.readKeyValuesToList(), CoreMatchers.is(Arrays.asList(new KeyValue("lhs1", "(lhsValue1|rhs1,rhsValue1)"))));
                if (topologyTestDriver != null) {
                    if (0 == 0) {
                        topologyTestDriver.close();
                        return;
                    }
                    try {
                        topologyTestDriver.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (topologyTestDriver != null) {
                if (th != null) {
                    try {
                        topologyTestDriver.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    topologyTestDriver.close();
                }
            }
            throw th4;
        }
    }

    @MethodSource({"testCases"})
    @ParameterizedTest
    public void doJoinFromRightThenDeleteRightEntity(boolean z, String str, boolean z2, boolean z3, boolean z4, boolean z5) {
        Properties streamsProperties = getStreamsProperties(str);
        TopologyTestDriver topologyTestDriver = new TopologyTestDriver(getTopology(streamsProperties, z2 ? "store" : null, z, z3, z4, z5), streamsProperties);
        Throwable th = null;
        try {
            try {
                TestInputTopic createInputTopic = topologyTestDriver.createInputTopic(RIGHT_TABLE, new StringSerializer(), new StringSerializer());
                TestInputTopic createInputTopic2 = topologyTestDriver.createInputTopic(LEFT_TABLE, new StringSerializer(), new StringSerializer());
                TestOutputTopic createOutputTopic = topologyTestDriver.createOutputTopic(OUTPUT, new StringDeserializer(), new StringDeserializer());
                KeyValueStore keyValueStore = topologyTestDriver.getKeyValueStore("store");
                createInputTopic2.pipeInput("lhs1", "lhsValue1|rhs1", this.baseTimestamp);
                createInputTopic2.pipeInput("lhs2", "lhsValue2|rhs2", this.baseTimestamp + 1);
                createInputTopic2.pipeInput("lhs3", "lhsValue3|rhs1", this.baseTimestamp + 2);
                MatcherAssert.assertThat(createOutputTopic.readKeyValuesToMap(), CoreMatchers.is(z ? Utils.mkMap(new Map.Entry[]{Utils.mkEntry("lhs1", "(lhsValue1|rhs1,null)"), Utils.mkEntry("lhs2", "(lhsValue2|rhs2,null)"), Utils.mkEntry("lhs3", "(lhsValue3|rhs1,null)")}) : Collections.emptyMap()));
                if (z2) {
                    MatcherAssert.assertThat(asMap(keyValueStore), CoreMatchers.is(z ? Utils.mkMap(new Map.Entry[]{Utils.mkEntry("lhs1", "(lhsValue1|rhs1,null)"), Utils.mkEntry("lhs2", "(lhsValue2|rhs2,null)"), Utils.mkEntry("lhs3", "(lhsValue3|rhs1,null)")}) : Collections.emptyMap()));
                }
                createInputTopic.pipeInput("rhs1", "rhsValue1", this.baseTimestamp + 3);
                MatcherAssert.assertThat(createOutputTopic.readKeyValuesToMap(), CoreMatchers.is(Utils.mkMap(new Map.Entry[]{Utils.mkEntry("lhs1", "(lhsValue1|rhs1,rhsValue1)"), Utils.mkEntry("lhs3", "(lhsValue3|rhs1,rhsValue1)")})));
                if (z2) {
                    MatcherAssert.assertThat(asMap(keyValueStore), CoreMatchers.is(z ? Utils.mkMap(new Map.Entry[]{Utils.mkEntry("lhs1", "(lhsValue1|rhs1,rhsValue1)"), Utils.mkEntry("lhs2", "(lhsValue2|rhs2,null)"), Utils.mkEntry("lhs3", "(lhsValue3|rhs1,rhsValue1)")}) : Utils.mkMap(new Map.Entry[]{Utils.mkEntry("lhs1", "(lhsValue1|rhs1,rhsValue1)"), Utils.mkEntry("lhs3", "(lhsValue3|rhs1,rhsValue1)")})));
                }
                createInputTopic.pipeInput("rhs2", "rhsValue2", this.baseTimestamp + 4);
                MatcherAssert.assertThat(createOutputTopic.readKeyValuesToMap(), CoreMatchers.is(Utils.mkMap(new Map.Entry[]{Utils.mkEntry("lhs2", "(lhsValue2|rhs2,rhsValue2)")})));
                if (z2) {
                    MatcherAssert.assertThat(asMap(keyValueStore), CoreMatchers.is(Utils.mkMap(new Map.Entry[]{Utils.mkEntry("lhs1", "(lhsValue1|rhs1,rhsValue1)"), Utils.mkEntry("lhs2", "(lhsValue2|rhs2,rhsValue2)"), Utils.mkEntry("lhs3", "(lhsValue3|rhs1,rhsValue1)")})));
                }
                createInputTopic.pipeInput("rhs3", "rhsValue3", this.baseTimestamp + 5);
                MatcherAssert.assertThat(createOutputTopic.readKeyValuesToMap(), CoreMatchers.is(Collections.emptyMap()));
                if (z2) {
                    MatcherAssert.assertThat(asMap(keyValueStore), CoreMatchers.is(Utils.mkMap(new Map.Entry[]{Utils.mkEntry("lhs1", "(lhsValue1|rhs1,rhsValue1)"), Utils.mkEntry("lhs2", "(lhsValue2|rhs2,rhsValue2)"), Utils.mkEntry("lhs3", "(lhsValue3|rhs1,rhsValue1)")})));
                }
                createInputTopic.pipeInput("rhs1", (String) null, this.baseTimestamp + 6);
                Map readKeyValuesToMap = createOutputTopic.readKeyValuesToMap();
                Map.Entry[] entryArr = new Map.Entry[2];
                entryArr[0] = Utils.mkEntry("lhs1", z ? "(lhsValue1|rhs1,null)" : null);
                entryArr[1] = Utils.mkEntry("lhs3", z ? "(lhsValue3|rhs1,null)" : null);
                MatcherAssert.assertThat(readKeyValuesToMap, CoreMatchers.is(Utils.mkMap(entryArr)));
                if (z2) {
                    MatcherAssert.assertThat(asMap(keyValueStore), CoreMatchers.is(z ? Utils.mkMap(new Map.Entry[]{Utils.mkEntry("lhs1", "(lhsValue1|rhs1,null)"), Utils.mkEntry("lhs2", "(lhsValue2|rhs2,rhsValue2)"), Utils.mkEntry("lhs3", "(lhsValue3|rhs1,null)")}) : Utils.mkMap(new Map.Entry[]{Utils.mkEntry("lhs2", "(lhsValue2|rhs2,rhsValue2)")})));
                }
                if (topologyTestDriver != null) {
                    if (0 == 0) {
                        topologyTestDriver.close();
                        return;
                    }
                    try {
                        topologyTestDriver.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (topologyTestDriver != null) {
                if (th != null) {
                    try {
                        topologyTestDriver.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    topologyTestDriver.close();
                }
            }
            throw th4;
        }
    }

    @MethodSource({"testCases"})
    @ParameterizedTest
    public void shouldEmitTombstoneWhenDeletingNonJoiningRecords(boolean z, String str, boolean z2, boolean z3, boolean z4, boolean z5) {
        Properties streamsProperties = getStreamsProperties(str);
        TopologyTestDriver topologyTestDriver = new TopologyTestDriver(getTopology(streamsProperties, z2 ? "store" : null, z, z3, z4, z5), streamsProperties);
        Throwable th = null;
        try {
            try {
                TestInputTopic createInputTopic = topologyTestDriver.createInputTopic(LEFT_TABLE, new StringSerializer(), new StringSerializer());
                TestOutputTopic createOutputTopic = topologyTestDriver.createOutputTopic(OUTPUT, new StringDeserializer(), new StringDeserializer());
                KeyValueStore keyValueStore = topologyTestDriver.getKeyValueStore("store");
                createInputTopic.pipeInput("lhs1", "lhsValue1|rhs1", this.baseTimestamp);
                Map mkMap = z ? Utils.mkMap(new Map.Entry[]{Utils.mkEntry("lhs1", "(lhsValue1|rhs1,null)")}) : Collections.emptyMap();
                MatcherAssert.assertThat(createOutputTopic.readKeyValuesToMap(), CoreMatchers.is(mkMap));
                if (z2) {
                    MatcherAssert.assertThat(asMap(keyValueStore), CoreMatchers.is(mkMap));
                }
                createInputTopic.pipeInput("lhs1", (String) null, this.baseTimestamp + 1);
                MatcherAssert.assertThat(createOutputTopic.readKeyValuesToMap(), CoreMatchers.is(Utils.mkMap(new Map.Entry[]{Utils.mkEntry("lhs1", (Object) null)})));
                if (z2) {
                    MatcherAssert.assertThat(asMap(keyValueStore), CoreMatchers.is(Collections.emptyMap()));
                }
                createInputTopic.pipeInput("lhs1", (String) null, this.baseTimestamp + 2);
                MatcherAssert.assertThat(createOutputTopic.readKeyValuesToMap(), CoreMatchers.is(Collections.emptyMap()));
                if (z2) {
                    MatcherAssert.assertThat(asMap(keyValueStore), CoreMatchers.is(Collections.emptyMap()));
                }
                if (topologyTestDriver != null) {
                    if (0 == 0) {
                        topologyTestDriver.close();
                        return;
                    }
                    try {
                        topologyTestDriver.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (topologyTestDriver != null) {
                if (th != null) {
                    try {
                        topologyTestDriver.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    topologyTestDriver.close();
                }
            }
            throw th4;
        }
    }

    @MethodSource({"testCases"})
    @ParameterizedTest
    public void shouldNotEmitTombstonesWhenDeletingNonExistingRecords(boolean z, String str, boolean z2, boolean z3, boolean z4, boolean z5) {
        Properties streamsProperties = getStreamsProperties(str);
        TopologyTestDriver topologyTestDriver = new TopologyTestDriver(getTopology(streamsProperties, z2 ? "store" : null, z, z3, z4, z5), streamsProperties);
        Throwable th = null;
        try {
            try {
                TestInputTopic createInputTopic = topologyTestDriver.createInputTopic(LEFT_TABLE, new StringSerializer(), new StringSerializer());
                TestOutputTopic createOutputTopic = topologyTestDriver.createOutputTopic(OUTPUT, new StringDeserializer(), new StringDeserializer());
                KeyValueStore keyValueStore = topologyTestDriver.getKeyValueStore("store");
                createInputTopic.pipeInput("lhs1", (String) null, this.baseTimestamp);
                MatcherAssert.assertThat(createOutputTopic.readKeyValuesToMap(), CoreMatchers.is(Collections.emptyMap()));
                if (z2) {
                    MatcherAssert.assertThat(asMap(keyValueStore), CoreMatchers.is(Collections.emptyMap()));
                }
                if (topologyTestDriver != null) {
                    if (0 == 0) {
                        topologyTestDriver.close();
                        return;
                    }
                    try {
                        topologyTestDriver.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (topologyTestDriver != null) {
                if (th != null) {
                    try {
                        topologyTestDriver.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    topologyTestDriver.close();
                }
            }
            throw th4;
        }
    }

    @MethodSource({"testCases"})
    @ParameterizedTest
    public void joinShouldProduceNullsWhenValueHasNonMatchingForeignKey(boolean z, String str, boolean z2, boolean z3, boolean z4, boolean z5) {
        Properties streamsProperties = getStreamsProperties(str);
        TopologyTestDriver topologyTestDriver = new TopologyTestDriver(getTopology(streamsProperties, z2 ? "store" : null, z, z3, z4, z5), streamsProperties);
        Throwable th = null;
        try {
            try {
                TestInputTopic createInputTopic = topologyTestDriver.createInputTopic(RIGHT_TABLE, new StringSerializer(), new StringSerializer());
                TestInputTopic createInputTopic2 = topologyTestDriver.createInputTopic(LEFT_TABLE, new StringSerializer(), new StringSerializer());
                TestOutputTopic createOutputTopic = topologyTestDriver.createOutputTopic(OUTPUT, new StringDeserializer(), new StringDeserializer());
                KeyValueStore keyValueStore = topologyTestDriver.getKeyValueStore("store");
                createInputTopic2.pipeInput("lhs1", "lhsValue1|rhs1", this.baseTimestamp);
                MatcherAssert.assertThat(createOutputTopic.readKeyValuesToMap(), CoreMatchers.is(z ? Utils.mkMap(new Map.Entry[]{Utils.mkEntry("lhs1", "(lhsValue1|rhs1,null)")}) : Collections.emptyMap()));
                if (z2) {
                    MatcherAssert.assertThat(asMap(keyValueStore), CoreMatchers.is(z ? Utils.mkMap(new Map.Entry[]{Utils.mkEntry("lhs1", "(lhsValue1|rhs1,null)")}) : Collections.emptyMap()));
                }
                createInputTopic2.pipeInput("lhs1", "lhsValue1|rhs2", this.baseTimestamp + 1);
                Map readKeyValuesToMap = createOutputTopic.readKeyValuesToMap();
                Map.Entry[] entryArr = new Map.Entry[1];
                entryArr[0] = Utils.mkEntry("lhs1", z ? "(lhsValue1|rhs2,null)" : null);
                MatcherAssert.assertThat(readKeyValuesToMap, CoreMatchers.is(Utils.mkMap(entryArr)));
                if (z2) {
                    MatcherAssert.assertThat(asMap(keyValueStore), CoreMatchers.is(z ? Utils.mkMap(new Map.Entry[]{Utils.mkEntry("lhs1", "(lhsValue1|rhs2,null)")}) : Collections.emptyMap()));
                }
                createInputTopic2.pipeInput("lhs1", "lhsValue1|rhs3", this.baseTimestamp + 2);
                Map readKeyValuesToMap2 = createOutputTopic.readKeyValuesToMap();
                Map.Entry[] entryArr2 = new Map.Entry[1];
                entryArr2[0] = Utils.mkEntry("lhs1", z ? "(lhsValue1|rhs3,null)" : null);
                MatcherAssert.assertThat(readKeyValuesToMap2, CoreMatchers.is(Utils.mkMap(entryArr2)));
                if (z2) {
                    MatcherAssert.assertThat(asMap(keyValueStore), CoreMatchers.is(z ? Utils.mkMap(new Map.Entry[]{Utils.mkEntry("lhs1", "(lhsValue1|rhs3,null)")}) : Collections.emptyMap()));
                }
                createInputTopic.pipeInput("rhs1", "rhsValue1", this.baseTimestamp + 3);
                MatcherAssert.assertThat(createOutputTopic.readKeyValuesToMap(), CoreMatchers.is(Collections.emptyMap()));
                if (z2) {
                    MatcherAssert.assertThat(asMap(keyValueStore), CoreMatchers.is(z ? Utils.mkMap(new Map.Entry[]{Utils.mkEntry("lhs1", "(lhsValue1|rhs3,null)")}) : Collections.emptyMap()));
                }
                createInputTopic2.pipeInput("lhs1", "lhsValue1|rhs1", this.baseTimestamp + 4);
                MatcherAssert.assertThat(createOutputTopic.readKeyValuesToMap(), CoreMatchers.is(Utils.mkMap(new Map.Entry[]{Utils.mkEntry("lhs1", "(lhsValue1|rhs1,rhsValue1)")})));
                if (z2) {
                    MatcherAssert.assertThat(asMap(keyValueStore), CoreMatchers.is(Utils.mkMap(new Map.Entry[]{Utils.mkEntry("lhs1", "(lhsValue1|rhs1,rhsValue1)")})));
                }
                createInputTopic2.pipeInput("lhs1", "lhsValue1|rhs2", this.baseTimestamp + 5);
                Map readKeyValuesToMap3 = createOutputTopic.readKeyValuesToMap();
                Map.Entry[] entryArr3 = new Map.Entry[1];
                entryArr3[0] = Utils.mkEntry("lhs1", z ? "(lhsValue1|rhs2,null)" : null);
                MatcherAssert.assertThat(readKeyValuesToMap3, CoreMatchers.is(Utils.mkMap(entryArr3)));
                if (z2) {
                    MatcherAssert.assertThat(asMap(keyValueStore), CoreMatchers.is(z ? Utils.mkMap(new Map.Entry[]{Utils.mkEntry("lhs1", "(lhsValue1|rhs2,null)")}) : Collections.emptyMap()));
                }
                if (topologyTestDriver != null) {
                    if (0 == 0) {
                        topologyTestDriver.close();
                        return;
                    }
                    try {
                        topologyTestDriver.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (topologyTestDriver != null) {
                if (th != null) {
                    try {
                        topologyTestDriver.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    topologyTestDriver.close();
                }
            }
            throw th4;
        }
    }

    @MethodSource({"testCases"})
    @ParameterizedTest
    public void shouldUnsubscribeOldForeignKeyIfLeftSideIsUpdated(boolean z, String str, boolean z2, boolean z3, boolean z4, boolean z5) {
        Properties streamsProperties = getStreamsProperties(str);
        TopologyTestDriver topologyTestDriver = new TopologyTestDriver(getTopology(streamsProperties, z2 ? "store" : null, z, z3, z4, z5), streamsProperties);
        Throwable th = null;
        try {
            try {
                TestInputTopic createInputTopic = topologyTestDriver.createInputTopic(RIGHT_TABLE, new StringSerializer(), new StringSerializer());
                TestInputTopic createInputTopic2 = topologyTestDriver.createInputTopic(LEFT_TABLE, new StringSerializer(), new StringSerializer());
                TestOutputTopic createOutputTopic = topologyTestDriver.createOutputTopic(OUTPUT, new StringDeserializer(), new StringDeserializer());
                KeyValueStore keyValueStore = topologyTestDriver.getKeyValueStore("store");
                createInputTopic.pipeInput("rhs1", "rhsValue1", this.baseTimestamp);
                createInputTopic.pipeInput("rhs2", "rhsValue2", this.baseTimestamp + 1);
                MatcherAssert.assertThat(createOutputTopic.readKeyValuesToMap(), CoreMatchers.is(Collections.emptyMap()));
                if (z2) {
                    MatcherAssert.assertThat(asMap(keyValueStore), CoreMatchers.is(Collections.emptyMap()));
                }
                createInputTopic2.pipeInput("lhs1", "lhsValue1|rhs1", this.baseTimestamp + 2);
                Map mkMap = Utils.mkMap(new Map.Entry[]{Utils.mkEntry("lhs1", "(lhsValue1|rhs1,rhsValue1)")});
                MatcherAssert.assertThat(createOutputTopic.readKeyValuesToMap(), CoreMatchers.is(mkMap));
                if (z2) {
                    MatcherAssert.assertThat(asMap(keyValueStore), CoreMatchers.is(mkMap));
                }
                createInputTopic2.pipeInput("lhs1", "lhsValue1|rhs2", this.baseTimestamp + 3);
                Map mkMap2 = Utils.mkMap(new Map.Entry[]{Utils.mkEntry("lhs1", "(lhsValue1|rhs2,rhsValue2)")});
                MatcherAssert.assertThat(createOutputTopic.readKeyValuesToMap(), CoreMatchers.is(mkMap2));
                if (z2) {
                    MatcherAssert.assertThat(asMap(keyValueStore), CoreMatchers.is(mkMap2));
                }
                createInputTopic.pipeInput("rhs1", "rhsValue1Delta", this.baseTimestamp + 4);
                MatcherAssert.assertThat(createOutputTopic.readKeyValuesToMap(), CoreMatchers.is(Collections.emptyMap()));
                if (z2) {
                    MatcherAssert.assertThat(asMap(keyValueStore), CoreMatchers.is(Utils.mkMap(new Map.Entry[]{Utils.mkEntry("lhs1", "(lhsValue1|rhs2,rhsValue2)")})));
                }
                if (topologyTestDriver != null) {
                    if (0 == 0) {
                        topologyTestDriver.close();
                        return;
                    }
                    try {
                        topologyTestDriver.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (topologyTestDriver != null) {
                if (th != null) {
                    try {
                        topologyTestDriver.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    topologyTestDriver.close();
                }
            }
            throw th4;
        }
    }

    @MethodSource({"testCasesWithoutLeftJoinArg"})
    @ParameterizedTest
    public void shouldEmitRecordOnNullForeignKeyForLeftJoins(String str, boolean z, boolean z2, boolean z3, boolean z4) {
        Properties streamsProperties = getStreamsProperties(str);
        TopologyTestDriver topologyTestDriver = new TopologyTestDriver(getTopology(streamsProperties, z ? "store" : null, true, z2, z3, z4, str2 -> {
            return null;
        }), streamsProperties);
        Throwable th = null;
        try {
            try {
                TestInputTopic createInputTopic = topologyTestDriver.createInputTopic(LEFT_TABLE, new StringSerializer(), new StringSerializer());
                TestOutputTopic createOutputTopic = topologyTestDriver.createOutputTopic(OUTPUT, new StringDeserializer(), new StringDeserializer());
                KeyValueStore keyValueStore = topologyTestDriver.getKeyValueStore("store");
                createInputTopic.pipeInput("lhs1", "lhsValue1|rhs1", this.baseTimestamp);
                Map mkMap = Utils.mkMap(new Map.Entry[]{Utils.mkEntry("lhs1", "(lhsValue1|rhs1,null)")});
                MatcherAssert.assertThat(createOutputTopic.readKeyValuesToMap(), CoreMatchers.is(mkMap));
                if (z) {
                    MatcherAssert.assertThat(asMap(keyValueStore), CoreMatchers.is(mkMap));
                }
                if (topologyTestDriver != null) {
                    if (0 == 0) {
                        topologyTestDriver.close();
                        return;
                    }
                    try {
                        topologyTestDriver.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (topologyTestDriver != null) {
                if (th != null) {
                    try {
                        topologyTestDriver.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    topologyTestDriver.close();
                }
            }
            throw th4;
        }
    }

    @MethodSource({"testCasesWithoutLeftJoinArg"})
    @ParameterizedTest
    public void shouldEmitRecordWhenOldAndNewFkDiffer(String str, boolean z, boolean z2, boolean z3, boolean z4) {
        Function function = str2 -> {
            String str2 = str2.split("\\|")[1];
            if (str2.equals("returnNull")) {
                return null;
            }
            return str2;
        };
        Properties streamsProperties = getStreamsProperties(str);
        TopologyTestDriver topologyTestDriver = new TopologyTestDriver(getTopology(streamsProperties, z ? "store" : null, true, z2, z3, z4, function), streamsProperties);
        Throwable th = null;
        try {
            try {
                TestInputTopic createInputTopic = topologyTestDriver.createInputTopic(LEFT_TABLE, new StringSerializer(), new StringSerializer());
                TestOutputTopic createOutputTopic = topologyTestDriver.createOutputTopic(OUTPUT, new StringDeserializer(), new StringDeserializer());
                KeyValueStore keyValueStore = topologyTestDriver.getKeyValueStore("store");
                KeyValueStore keyValueStore2 = topologyTestDriver.getKeyValueStore((String) ((Map.Entry) topologyTestDriver.getAllStateStores().entrySet().stream().filter(entry -> {
                    return ((String) entry.getKey()).contains("SUBSCRIPTION-STATE-STORE");
                }).findAny().orElseThrow(() -> {
                    return new RuntimeException("couldn't find store");
                })).getKey());
                Bytes subscriptionStoreKey = subscriptionStoreKey("lhs1", "rhs1");
                createInputTopic.pipeInput("lhs1", "lhsValue1|rhs1", this.baseTimestamp);
                Map mkMap = Utils.mkMap(new Map.Entry[]{Utils.mkEntry("lhs1", "(lhsValue1|rhs1,null)")});
                MatcherAssert.assertThat(createOutputTopic.readKeyValuesToMap(), CoreMatchers.is(mkMap));
                if (z) {
                    MatcherAssert.assertThat(asMap(keyValueStore), CoreMatchers.is(mkMap));
                }
                Assertions.assertNotNull(keyValueStore2.get(subscriptionStoreKey));
                createInputTopic.pipeInput("lhs1", "lhsValue1|returnNull", this.baseTimestamp);
                Map mkMap2 = Utils.mkMap(new Map.Entry[]{Utils.mkEntry("lhs1", "(lhsValue1|returnNull,null)")});
                MatcherAssert.assertThat(createOutputTopic.readKeyValuesToMap(), CoreMatchers.is(mkMap2));
                if (z) {
                    MatcherAssert.assertThat(asMap(keyValueStore), CoreMatchers.is(mkMap2));
                }
                Assertions.assertNull(keyValueStore2.get(subscriptionStoreKey));
                if (topologyTestDriver != null) {
                    if (0 == 0) {
                        topologyTestDriver.close();
                        return;
                    }
                    try {
                        topologyTestDriver.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (topologyTestDriver != null) {
                if (th != null) {
                    try {
                        topologyTestDriver.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    topologyTestDriver.close();
                }
            }
            throw th4;
        }
    }

    private static Bytes subscriptionStoreKey(String str, String str2) {
        byte[] bytes = str.getBytes();
        byte[] bytes2 = str2.getBytes();
        ByteBuffer allocate = ByteBuffer.allocate(4 + bytes.length + bytes2.length);
        allocate.putInt(bytes2.length);
        allocate.put(bytes2);
        allocate.put(bytes);
        return Bytes.wrap(allocate.array());
    }

    protected static Map<String, String> asMap(KeyValueStore<String, String> keyValueStore) {
        HashMap hashMap = new HashMap();
        keyValueStore.all().forEachRemaining(keyValue -> {
        });
        return hashMap;
    }

    protected static List<KeyValue<String, String>> makeList(KeyValueStore<String, ValueAndTimestamp<String>> keyValueStore) {
        LinkedList linkedList = new LinkedList();
        keyValueStore.all().forEachRemaining(keyValue -> {
            linkedList.add(new KeyValue(keyValue.key, ((ValueAndTimestamp) keyValue.value).value()));
        });
        return linkedList;
    }

    protected static Topology getTopology(Properties properties, String str, boolean z, boolean z2, boolean z3, boolean z4) {
        return getTopology(properties, str, z, z2, z3, z4, str2 -> {
            return str2.split("\\|")[1];
        });
    }

    protected static Topology getTopology(Properties properties, String str, boolean z, boolean z2, boolean z3, boolean z4, Function<String, String> function) {
        UniqueTopicSerdeScope uniqueTopicSerdeScope = new UniqueTopicSerdeScope();
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        KTable table = z3 ? streamsBuilder.table(LEFT_TABLE, Consumed.with(uniqueTopicSerdeScope.decorateSerde(Serdes.String(), properties, true), uniqueTopicSerdeScope.decorateSerde(Serdes.String(), properties, false)), Materialized.as(Stores.persistentVersionedKeyValueStore("left", Duration.ofMinutes(5L)))) : streamsBuilder.table(LEFT_TABLE, Consumed.with(uniqueTopicSerdeScope.decorateSerde(Serdes.String(), properties, true), uniqueTopicSerdeScope.decorateSerde(Serdes.String(), properties, false)));
        KTable table2 = z4 ? streamsBuilder.table(RIGHT_TABLE, Consumed.with(uniqueTopicSerdeScope.decorateSerde(Serdes.String(), properties, true), uniqueTopicSerdeScope.decorateSerde(Serdes.String(), properties, false)), Materialized.as(Stores.persistentVersionedKeyValueStore("right", Duration.ofMinutes(5L)))) : streamsBuilder.table(RIGHT_TABLE, Consumed.with(uniqueTopicSerdeScope.decorateSerde(Serdes.String(), properties, true), uniqueTopicSerdeScope.decorateSerde(Serdes.String(), properties, false)));
        ValueJoiner valueJoiner = (str2, str3) -> {
            return "(" + str2 + "," + str3 + ")";
        };
        ValueJoiner valueJoiner2 = z2 ? (str4, str5) -> {
            return "rejoin(" + str4 + "," + str5 + ")";
        } : null;
        Materialized withCachingDisabled = str == null ? Materialized.with((Serde) null, uniqueTopicSerdeScope.decorateSerde(Serdes.String(), properties, false)).withCachingDisabled() : Materialized.as(Stores.inMemoryKeyValueStore(str)).withValueSerde(uniqueTopicSerdeScope.decorateSerde(Serdes.String(), properties, false)).withCachingDisabled();
        Materialized with = !z2 ? null : str == null ? Materialized.with((Serde) null, uniqueTopicSerdeScope.decorateSerde(Serdes.String(), properties, false)) : Materialized.as(Stores.inMemoryKeyValueStore(str + "-rejoin")).withValueSerde(uniqueTopicSerdeScope.decorateSerde(Serdes.String(), properties, false)).withCachingDisabled();
        if (z) {
            KTable leftJoin = table.leftJoin(table2, function, valueJoiner, withCachingDisabled);
            leftJoin.toStream().to(OUTPUT);
            if (z2) {
                leftJoin.leftJoin(table, valueJoiner2, with).toStream().to(REJOIN_OUTPUT);
            }
        } else {
            KTable join = table.join(table2, function, valueJoiner, withCachingDisabled);
            join.toStream().to(OUTPUT);
            if (z2) {
                join.join(table, valueJoiner2, with).toStream().to(REJOIN_OUTPUT);
            }
        }
        return streamsBuilder.build(properties);
    }

    @MethodSource({"versionedDataTestCases"})
    @ParameterizedTest
    public void shouldIgnoreOutOfOrderRecordsIffVersioned(boolean z, String str, boolean z2, boolean z3, boolean z4, boolean z5) {
        Properties streamsProperties = getStreamsProperties(str);
        TopologyTestDriver topologyTestDriver = new TopologyTestDriver(getTopology(streamsProperties, z2 ? "store" : null, z, z3, z4, z5), streamsProperties);
        Throwable th = null;
        try {
            TestInputTopic createInputTopic = topologyTestDriver.createInputTopic(RIGHT_TABLE, new StringSerializer(), new StringSerializer());
            TestInputTopic createInputTopic2 = topologyTestDriver.createInputTopic(LEFT_TABLE, new StringSerializer(), new StringSerializer());
            TestOutputTopic createOutputTopic = topologyTestDriver.createOutputTopic(OUTPUT, new StringDeserializer(), new StringDeserializer());
            KeyValueStore keyValueStore = topologyTestDriver.getKeyValueStore("store");
            createInputTopic.pipeInput("rhs1", "rhsValue1", this.baseTimestamp + 4);
            MatcherAssert.assertThat(createOutputTopic.readKeyValuesToMap(), CoreMatchers.is(Collections.emptyMap()));
            if (z2) {
                MatcherAssert.assertThat(asMap(keyValueStore), CoreMatchers.is(Collections.emptyMap()));
            }
            createInputTopic2.pipeInput("lhs1", "lhsValue1|rhs1", this.baseTimestamp + 3);
            createInputTopic2.pipeInput("lhs2", "lhsValue2|rhs1", this.baseTimestamp + 5);
            Map mkMap = Utils.mkMap(new Map.Entry[]{Utils.mkEntry("lhs1", "(lhsValue1|rhs1,rhsValue1)"), Utils.mkEntry("lhs2", "(lhsValue2|rhs1,rhsValue1)")});
            MatcherAssert.assertThat(createOutputTopic.readKeyValuesToMap(), CoreMatchers.is(mkMap));
            if (z2) {
                MatcherAssert.assertThat(asMap(keyValueStore), CoreMatchers.is(mkMap));
            }
            createInputTopic2.pipeInput("lhs2", (Object) null, this.baseTimestamp + 6);
            MatcherAssert.assertThat(createOutputTopic.readKeyValuesToMap(), CoreMatchers.is(Utils.mkMap(new Map.Entry[]{Utils.mkEntry("lhs2", (Object) null)})));
            if (z2) {
                MatcherAssert.assertThat(asMap(keyValueStore), CoreMatchers.is(Utils.mkMap(new Map.Entry[]{Utils.mkEntry("lhs1", "(lhsValue1|rhs1,rhsValue1)")})));
            }
            createInputTopic2.pipeInput("lhs1", "lhsValue1_ooo|rhs1", this.baseTimestamp + 2);
            createInputTopic2.pipeInput("lhs2", "lhsValue2_ooo|rhs1", this.baseTimestamp + 2);
            if (z4) {
                MatcherAssert.assertThat(createOutputTopic.readKeyValuesToMap(), CoreMatchers.is(Collections.emptyMap()));
                if (z2) {
                    MatcherAssert.assertThat(asMap(keyValueStore), CoreMatchers.is(Utils.mkMap(new Map.Entry[]{Utils.mkEntry("lhs1", "(lhsValue1|rhs1,rhsValue1)")})));
                }
            } else {
                Map mkMap2 = Utils.mkMap(new Map.Entry[]{Utils.mkEntry("lhs1", "(lhsValue1_ooo|rhs1,rhsValue1)"), Utils.mkEntry("lhs2", "(lhsValue2_ooo|rhs1,rhsValue1)")});
                MatcherAssert.assertThat(createOutputTopic.readKeyValuesToMap(), CoreMatchers.is(mkMap2));
                if (z2) {
                    MatcherAssert.assertThat(asMap(keyValueStore), CoreMatchers.is(mkMap2));
                }
            }
            createInputTopic2.pipeInput("lhs1", (Object) null, this.baseTimestamp + 2);
            if (z4) {
                MatcherAssert.assertThat(createOutputTopic.readKeyValuesToMap(), CoreMatchers.is(Collections.emptyMap()));
                if (z2) {
                    MatcherAssert.assertThat(asMap(keyValueStore), CoreMatchers.is(Utils.mkMap(new Map.Entry[]{Utils.mkEntry("lhs1", "(lhsValue1|rhs1,rhsValue1)")})));
                }
            } else {
                MatcherAssert.assertThat(createOutputTopic.readKeyValuesToMap(), CoreMatchers.is(Utils.mkMap(new Map.Entry[]{Utils.mkEntry("lhs1", (Object) null)})));
                if (z2) {
                    MatcherAssert.assertThat(asMap(keyValueStore), CoreMatchers.is(Utils.mkMap(new Map.Entry[]{Utils.mkEntry("lhs2", "(lhsValue2_ooo|rhs1,rhsValue1)")})));
                }
            }
            createInputTopic2.pipeInput("lhs1", "lhsValue1_new|rhs1", this.baseTimestamp + 8);
            createInputTopic2.pipeInput("lhs2", "lhsValue2_new|rhs1", this.baseTimestamp + 8);
            Map mkMap3 = Utils.mkMap(new Map.Entry[]{Utils.mkEntry("lhs1", "(lhsValue1_new|rhs1,rhsValue1)"), Utils.mkEntry("lhs2", "(lhsValue2_new|rhs1,rhsValue1)")});
            MatcherAssert.assertThat(createOutputTopic.readKeyValuesToMap(), CoreMatchers.is(mkMap3));
            if (z2) {
                MatcherAssert.assertThat(asMap(keyValueStore), CoreMatchers.is(mkMap3));
            }
            createInputTopic.pipeInput("rhs1", "rhsValue1_ooo", this.baseTimestamp + 1);
            if (z5) {
                MatcherAssert.assertThat(createOutputTopic.readKeyValuesToMap(), CoreMatchers.is(Collections.emptyMap()));
                if (z2) {
                    MatcherAssert.assertThat(asMap(keyValueStore), CoreMatchers.is(Utils.mkMap(new Map.Entry[]{Utils.mkEntry("lhs1", "(lhsValue1_new|rhs1,rhsValue1)"), Utils.mkEntry("lhs2", "(lhsValue2_new|rhs1,rhsValue1)")})));
                }
            } else {
                MatcherAssert.assertThat(createOutputTopic.readKeyValuesToMap(), CoreMatchers.is(Utils.mkMap(new Map.Entry[]{Utils.mkEntry("lhs1", "(lhsValue1_new|rhs1,rhsValue1_ooo)"), Utils.mkEntry("lhs2", "(lhsValue2_new|rhs1,rhsValue1_ooo)")})));
                if (z2) {
                    MatcherAssert.assertThat(asMap(keyValueStore), CoreMatchers.is(Utils.mkMap(new Map.Entry[]{Utils.mkEntry("lhs1", "(lhsValue1_new|rhs1,rhsValue1_ooo)"), Utils.mkEntry("lhs2", "(lhsValue2_new|rhs1,rhsValue1_ooo)")})));
                }
            }
            createInputTopic.pipeInput("rhs1", (Object) null, this.baseTimestamp + 1);
            if (z5) {
                MatcherAssert.assertThat(createOutputTopic.readKeyValuesToMap(), CoreMatchers.is(Collections.emptyMap()));
                if (z2) {
                    MatcherAssert.assertThat(asMap(keyValueStore), CoreMatchers.is(Utils.mkMap(new Map.Entry[]{Utils.mkEntry("lhs1", "(lhsValue1_new|rhs1,rhsValue1)"), Utils.mkEntry("lhs2", "(lhsValue2_new|rhs1,rhsValue1)")})));
                }
            } else if (z) {
                MatcherAssert.assertThat(createOutputTopic.readKeyValuesToMap(), CoreMatchers.is(Utils.mkMap(new Map.Entry[]{Utils.mkEntry("lhs1", "(lhsValue1_new|rhs1,null)"), Utils.mkEntry("lhs2", "(lhsValue2_new|rhs1,null)")})));
                if (z2) {
                    MatcherAssert.assertThat(asMap(keyValueStore), CoreMatchers.is(Utils.mkMap(new Map.Entry[]{Utils.mkEntry("lhs1", "(lhsValue1_new|rhs1,null)"), Utils.mkEntry("lhs2", "(lhsValue2_new|rhs1,null)")})));
                }
            } else {
                MatcherAssert.assertThat(createOutputTopic.readKeyValuesToMap(), CoreMatchers.is(Utils.mkMap(new Map.Entry[]{Utils.mkEntry("lhs1", (Object) null), Utils.mkEntry("lhs2", (Object) null)})));
                if (z2) {
                    MatcherAssert.assertThat(asMap(keyValueStore), CoreMatchers.is(Collections.emptyMap()));
                }
            }
            createInputTopic.pipeInput("rhs1", "rhsValue1_new", this.baseTimestamp + 6);
            MatcherAssert.assertThat(createOutputTopic.readKeyValuesToMap(), CoreMatchers.is(Utils.mkMap(new Map.Entry[]{Utils.mkEntry("lhs1", "(lhsValue1_new|rhs1,rhsValue1_new)"), Utils.mkEntry("lhs2", "(lhsValue2_new|rhs1,rhsValue1_new)")})));
            if (z2) {
                MatcherAssert.assertThat(asMap(keyValueStore), CoreMatchers.is(Utils.mkMap(new Map.Entry[]{Utils.mkEntry("lhs1", "(lhsValue1_new|rhs1,rhsValue1_new)"), Utils.mkEntry("lhs2", "(lhsValue2_new|rhs1,rhsValue1_new)")})));
            }
            if (topologyTestDriver != null) {
                if (0 == 0) {
                    topologyTestDriver.close();
                    return;
                }
                try {
                    topologyTestDriver.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (topologyTestDriver != null) {
                if (0 != 0) {
                    try {
                        topologyTestDriver.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    topologyTestDriver.close();
                }
            }
            throw th3;
        }
    }
}
