package org.apache.kafka.streams.kstream.internals;

import java.time.Duration;
import java.time.Instant;
import java.util.Arrays;
import java.util.Properties;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.kstream.Aggregator;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Grouped;
import org.apache.kafka.streams.kstream.Initializer;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Named;
import org.apache.kafka.streams.kstream.Reducer;
import org.apache.kafka.streams.kstream.SlidingWindows;
import org.apache.kafka.streams.kstream.TimeWindowedKStream;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.apache.kafka.test.MockAggregator;
import org.apache.kafka.test.MockInitializer;
import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.MockReducer;
import org.apache.kafka.test.StreamsTestUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/kafka/streams/kstream/internals/SlidingWindowedKStreamImplTest.class */
public class SlidingWindowedKStreamImplTest {
    private static final String TOPIC = "input";
    private final StreamsBuilder builder = new StreamsBuilder();
    private final Properties props = StreamsTestUtils.getStreamsConfig((Serde<?>) Serdes.String(), (Serde<?>) Serdes.String());
    private TimeWindowedKStream<String, String> windowedStream;

    @Before
    public void before() {
        this.windowedStream = this.builder.stream(TOPIC, Consumed.with(Serdes.String(), Serdes.String())).groupByKey(Grouped.with(Serdes.String(), Serdes.String())).windowedBy(SlidingWindows.withTimeDifferenceAndGrace(Duration.ofMillis(100L), Duration.ofMillis(1000L)));
    }

    @Test
    public void shouldCountSlidingWindows() {
        MockProcessorSupplier mockProcessorSupplier = new MockProcessorSupplier();
        this.windowedStream.count().toStream().process(mockProcessorSupplier, new String[0]);
        TopologyTestDriver topologyTestDriver = new TopologyTestDriver(this.builder.build(), this.props);
        Throwable th = null;
        try {
            try {
                processData(topologyTestDriver);
                if (topologyTestDriver != null) {
                    if (0 != 0) {
                        try {
                            topologyTestDriver.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        topologyTestDriver.close();
                    }
                }
                MatcherAssert.assertThat(mockProcessorSupplier.theCapturedProcessor().lastValueAndTimestampPerKey().get(new Windowed("1", new TimeWindow(0L, 100L))), CoreMatchers.equalTo(ValueAndTimestamp.make(1L, 100L)));
                MatcherAssert.assertThat(mockProcessorSupplier.theCapturedProcessor().lastValueAndTimestampPerKey().get(new Windowed("1", new TimeWindow(101L, 201L))), CoreMatchers.equalTo(ValueAndTimestamp.make(1L, 150L)));
                MatcherAssert.assertThat(mockProcessorSupplier.theCapturedProcessor().lastValueAndTimestampPerKey().get(new Windowed("1", new TimeWindow(50L, 150L))), CoreMatchers.equalTo(ValueAndTimestamp.make(2L, 150L)));
                MatcherAssert.assertThat(mockProcessorSupplier.theCapturedProcessor().lastValueAndTimestampPerKey().get(new Windowed("1", new TimeWindow(400L, 500L))), CoreMatchers.equalTo(ValueAndTimestamp.make(1L, 500L)));
                MatcherAssert.assertThat(mockProcessorSupplier.theCapturedProcessor().lastValueAndTimestampPerKey().get(new Windowed("2", new TimeWindow(100L, 200L))), CoreMatchers.equalTo(ValueAndTimestamp.make(2L, 200L)));
                MatcherAssert.assertThat(mockProcessorSupplier.theCapturedProcessor().lastValueAndTimestampPerKey().get(new Windowed("2", new TimeWindow(50L, 150L))), CoreMatchers.equalTo(ValueAndTimestamp.make(1L, 150L)));
                MatcherAssert.assertThat(mockProcessorSupplier.theCapturedProcessor().lastValueAndTimestampPerKey().get(new Windowed("2", new TimeWindow(151L, 251L))), CoreMatchers.equalTo(ValueAndTimestamp.make(1L, 200L)));
            } finally {
            }
        } catch (Throwable th3) {
            if (topologyTestDriver != null) {
                if (th != null) {
                    try {
                        topologyTestDriver.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    topologyTestDriver.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void shouldReduceSlidingWindows() {
        MockProcessorSupplier mockProcessorSupplier = new MockProcessorSupplier();
        this.windowedStream.reduce(MockReducer.STRING_ADDER).toStream().process(mockProcessorSupplier, new String[0]);
        TopologyTestDriver topologyTestDriver = new TopologyTestDriver(this.builder.build(), this.props);
        Throwable th = null;
        try {
            try {
                processData(topologyTestDriver);
                if (topologyTestDriver != null) {
                    if (0 != 0) {
                        try {
                            topologyTestDriver.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        topologyTestDriver.close();
                    }
                }
                MatcherAssert.assertThat(mockProcessorSupplier.theCapturedProcessor().lastValueAndTimestampPerKey().get(new Windowed("1", new TimeWindow(0L, 100L))), CoreMatchers.equalTo(ValueAndTimestamp.make("1", 100L)));
                MatcherAssert.assertThat(mockProcessorSupplier.theCapturedProcessor().lastValueAndTimestampPerKey().get(new Windowed("1", new TimeWindow(101L, 201L))), CoreMatchers.equalTo(ValueAndTimestamp.make("2", 150L)));
                MatcherAssert.assertThat(mockProcessorSupplier.theCapturedProcessor().lastValueAndTimestampPerKey().get(new Windowed("1", new TimeWindow(50L, 150L))), CoreMatchers.equalTo(ValueAndTimestamp.make("1+2", 150L)));
                MatcherAssert.assertThat(mockProcessorSupplier.theCapturedProcessor().lastValueAndTimestampPerKey().get(new Windowed("1", new TimeWindow(400L, 500L))), CoreMatchers.equalTo(ValueAndTimestamp.make("3", 500L)));
                MatcherAssert.assertThat(mockProcessorSupplier.theCapturedProcessor().lastValueAndTimestampPerKey().get(new Windowed("2", new TimeWindow(100L, 200L))), CoreMatchers.equalTo(ValueAndTimestamp.make("10+20", 200L)));
                MatcherAssert.assertThat(mockProcessorSupplier.theCapturedProcessor().lastValueAndTimestampPerKey().get(new Windowed("2", new TimeWindow(50L, 150L))), CoreMatchers.equalTo(ValueAndTimestamp.make("20", 150L)));
                MatcherAssert.assertThat(mockProcessorSupplier.theCapturedProcessor().lastValueAndTimestampPerKey().get(new Windowed("2", new TimeWindow(151L, 251L))), CoreMatchers.equalTo(ValueAndTimestamp.make("10", 200L)));
            } finally {
            }
        } catch (Throwable th3) {
            if (topologyTestDriver != null) {
                if (th != null) {
                    try {
                        topologyTestDriver.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    topologyTestDriver.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void shouldAggregateSlidingWindows() {
        MockProcessorSupplier mockProcessorSupplier = new MockProcessorSupplier();
        this.windowedStream.aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, Materialized.with(Serdes.String(), Serdes.String())).toStream().process(mockProcessorSupplier, new String[0]);
        TopologyTestDriver topologyTestDriver = new TopologyTestDriver(this.builder.build(), this.props);
        Throwable th = null;
        try {
            try {
                processData(topologyTestDriver);
                if (topologyTestDriver != null) {
                    if (0 != 0) {
                        try {
                            topologyTestDriver.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        topologyTestDriver.close();
                    }
                }
                MatcherAssert.assertThat(mockProcessorSupplier.theCapturedProcessor().lastValueAndTimestampPerKey().get(new Windowed("1", new TimeWindow(0L, 100L))), CoreMatchers.equalTo(ValueAndTimestamp.make("0+1", 100L)));
                MatcherAssert.assertThat(mockProcessorSupplier.theCapturedProcessor().lastValueAndTimestampPerKey().get(new Windowed("1", new TimeWindow(101L, 201L))), CoreMatchers.equalTo(ValueAndTimestamp.make("0+2", 150L)));
                MatcherAssert.assertThat(mockProcessorSupplier.theCapturedProcessor().lastValueAndTimestampPerKey().get(new Windowed("1", new TimeWindow(50L, 150L))), CoreMatchers.equalTo(ValueAndTimestamp.make("0+1+2", 150L)));
                MatcherAssert.assertThat(mockProcessorSupplier.theCapturedProcessor().lastValueAndTimestampPerKey().get(new Windowed("1", new TimeWindow(400L, 500L))), CoreMatchers.equalTo(ValueAndTimestamp.make("0+3", 500L)));
                MatcherAssert.assertThat(mockProcessorSupplier.theCapturedProcessor().lastValueAndTimestampPerKey().get(new Windowed("2", new TimeWindow(100L, 200L))), CoreMatchers.equalTo(ValueAndTimestamp.make("0+10+20", 200L)));
                MatcherAssert.assertThat(mockProcessorSupplier.theCapturedProcessor().lastValueAndTimestampPerKey().get(new Windowed("2", new TimeWindow(50L, 150L))), CoreMatchers.equalTo(ValueAndTimestamp.make("0+20", 150L)));
                MatcherAssert.assertThat(mockProcessorSupplier.theCapturedProcessor().lastValueAndTimestampPerKey().get(new Windowed("2", new TimeWindow(151L, 251L))), CoreMatchers.equalTo(ValueAndTimestamp.make("0+10", 200L)));
            } finally {
            }
        } catch (Throwable th3) {
            if (topologyTestDriver != null) {
                if (th != null) {
                    try {
                        topologyTestDriver.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    topologyTestDriver.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void shouldMaterializeCount() {
        this.windowedStream.count(Materialized.as("count-store").withKeySerde(Serdes.String()).withValueSerde(Serdes.Long()));
        TopologyTestDriver topologyTestDriver = new TopologyTestDriver(this.builder.build(), this.props);
        Throwable th = null;
        try {
            processData(topologyTestDriver);
            MatcherAssert.assertThat(StreamsTestUtils.toList(topologyTestDriver.getWindowStore("count-store").fetch("1", "2", Instant.ofEpochMilli(0L), Instant.ofEpochMilli(1000L))), CoreMatchers.equalTo(Arrays.asList(KeyValue.pair(new Windowed("1", new TimeWindow(0L, 100L)), 1L), KeyValue.pair(new Windowed("1", new TimeWindow(50L, 150L)), 2L), KeyValue.pair(new Windowed("1", new TimeWindow(101L, 201L)), 1L), KeyValue.pair(new Windowed("1", new TimeWindow(400L, 500L)), 1L), KeyValue.pair(new Windowed("2", new TimeWindow(50L, 150L)), 1L), KeyValue.pair(new Windowed("2", new TimeWindow(100L, 200L)), 2L), KeyValue.pair(new Windowed("2", new TimeWindow(151L, 251L)), 1L))));
            MatcherAssert.assertThat(StreamsTestUtils.toList(topologyTestDriver.getTimestampedWindowStore("count-store").fetch("1", "2", Instant.ofEpochMilli(0L), Instant.ofEpochMilli(1000L))), CoreMatchers.equalTo(Arrays.asList(KeyValue.pair(new Windowed("1", new TimeWindow(0L, 100L)), ValueAndTimestamp.make(1L, 100L)), KeyValue.pair(new Windowed("1", new TimeWindow(50L, 150L)), ValueAndTimestamp.make(2L, 150L)), KeyValue.pair(new Windowed("1", new TimeWindow(101L, 201L)), ValueAndTimestamp.make(1L, 150L)), KeyValue.pair(new Windowed("1", new TimeWindow(400L, 500L)), ValueAndTimestamp.make(1L, 500L)), KeyValue.pair(new Windowed("2", new TimeWindow(50L, 150L)), ValueAndTimestamp.make(1L, 150L)), KeyValue.pair(new Windowed("2", new TimeWindow(100L, 200L)), ValueAndTimestamp.make(2L, 200L)), KeyValue.pair(new Windowed("2", new TimeWindow(151L, 251L)), ValueAndTimestamp.make(1L, 200L)))));
            if (topologyTestDriver != null) {
                if (0 == 0) {
                    topologyTestDriver.close();
                    return;
                }
                try {
                    topologyTestDriver.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (topologyTestDriver != null) {
                if (0 != 0) {
                    try {
                        topologyTestDriver.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    topologyTestDriver.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void shouldMaterializeReduced() {
        this.windowedStream.reduce(MockReducer.STRING_ADDER, Materialized.as("reduced").withKeySerde(Serdes.String()).withValueSerde(Serdes.String()));
        TopologyTestDriver topologyTestDriver = new TopologyTestDriver(this.builder.build(), this.props);
        Throwable th = null;
        try {
            processData(topologyTestDriver);
            MatcherAssert.assertThat(StreamsTestUtils.toList(topologyTestDriver.getWindowStore("reduced").fetch("1", "2", Instant.ofEpochMilli(0L), Instant.ofEpochMilli(1000L))), CoreMatchers.equalTo(Arrays.asList(KeyValue.pair(new Windowed("1", new TimeWindow(0L, 100L)), "1"), KeyValue.pair(new Windowed("1", new TimeWindow(50L, 150L)), "1+2"), KeyValue.pair(new Windowed("1", new TimeWindow(101L, 201L)), "2"), KeyValue.pair(new Windowed("1", new TimeWindow(400L, 500L)), "3"), KeyValue.pair(new Windowed("2", new TimeWindow(50L, 150L)), "20"), KeyValue.pair(new Windowed("2", new TimeWindow(100L, 200L)), "10+20"), KeyValue.pair(new Windowed("2", new TimeWindow(151L, 251L)), "10"))));
            MatcherAssert.assertThat(StreamsTestUtils.toList(topologyTestDriver.getTimestampedWindowStore("reduced").fetch("1", "2", Instant.ofEpochMilli(0L), Instant.ofEpochMilli(1000L))), CoreMatchers.equalTo(Arrays.asList(KeyValue.pair(new Windowed("1", new TimeWindow(0L, 100L)), ValueAndTimestamp.make("1", 100L)), KeyValue.pair(new Windowed("1", new TimeWindow(50L, 150L)), ValueAndTimestamp.make("1+2", 150L)), KeyValue.pair(new Windowed("1", new TimeWindow(101L, 201L)), ValueAndTimestamp.make("2", 150L)), KeyValue.pair(new Windowed("1", new TimeWindow(400L, 500L)), ValueAndTimestamp.make("3", 500L)), KeyValue.pair(new Windowed("2", new TimeWindow(50L, 150L)), ValueAndTimestamp.make("20", 150L)), KeyValue.pair(new Windowed("2", new TimeWindow(100L, 200L)), ValueAndTimestamp.make("10+20", 200L)), KeyValue.pair(new Windowed("2", new TimeWindow(151L, 251L)), ValueAndTimestamp.make("10", 200L)))));
            if (topologyTestDriver != null) {
                if (0 == 0) {
                    topologyTestDriver.close();
                    return;
                }
                try {
                    topologyTestDriver.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (topologyTestDriver != null) {
                if (0 != 0) {
                    try {
                        topologyTestDriver.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    topologyTestDriver.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void shouldMaterializeAggregated() {
        this.windowedStream.aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, Materialized.as("aggregated").withKeySerde(Serdes.String()).withValueSerde(Serdes.String()));
        TopologyTestDriver topologyTestDriver = new TopologyTestDriver(this.builder.build(), this.props);
        Throwable th = null;
        try {
            processData(topologyTestDriver);
            MatcherAssert.assertThat(StreamsTestUtils.toList(topologyTestDriver.getWindowStore("aggregated").fetch("1", "2", Instant.ofEpochMilli(0L), Instant.ofEpochMilli(1000L))), CoreMatchers.equalTo(Arrays.asList(KeyValue.pair(new Windowed("1", new TimeWindow(0L, 100L)), "0+1"), KeyValue.pair(new Windowed("1", new TimeWindow(50L, 150L)), "0+1+2"), KeyValue.pair(new Windowed("1", new TimeWindow(101L, 201L)), "0+2"), KeyValue.pair(new Windowed("1", new TimeWindow(400L, 500L)), "0+3"), KeyValue.pair(new Windowed("2", new TimeWindow(50L, 150L)), "0+20"), KeyValue.pair(new Windowed("2", new TimeWindow(100L, 200L)), "0+10+20"), KeyValue.pair(new Windowed("2", new TimeWindow(151L, 251L)), "0+10"))));
            MatcherAssert.assertThat(StreamsTestUtils.toList(topologyTestDriver.getTimestampedWindowStore("aggregated").fetch("1", "2", Instant.ofEpochMilli(0L), Instant.ofEpochMilli(1000L))), CoreMatchers.equalTo(Arrays.asList(KeyValue.pair(new Windowed("1", new TimeWindow(0L, 100L)), ValueAndTimestamp.make("0+1", 100L)), KeyValue.pair(new Windowed("1", new TimeWindow(50L, 150L)), ValueAndTimestamp.make("0+1+2", 150L)), KeyValue.pair(new Windowed("1", new TimeWindow(101L, 201L)), ValueAndTimestamp.make("0+2", 150L)), KeyValue.pair(new Windowed("1", new TimeWindow(400L, 500L)), ValueAndTimestamp.make("0+3", 500L)), KeyValue.pair(new Windowed("2", new TimeWindow(50L, 150L)), ValueAndTimestamp.make("0+20", 150L)), KeyValue.pair(new Windowed("2", new TimeWindow(100L, 200L)), ValueAndTimestamp.make("0+10+20", 200L)), KeyValue.pair(new Windowed("2", new TimeWindow(151L, 251L)), ValueAndTimestamp.make("0+10", 200L)))));
            if (topologyTestDriver != null) {
                if (0 == 0) {
                    topologyTestDriver.close();
                    return;
                }
                try {
                    topologyTestDriver.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (topologyTestDriver != null) {
                if (0 != 0) {
                    try {
                        topologyTestDriver.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    topologyTestDriver.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void shouldThrowNullPointerOnAggregateIfInitializerIsNull() {
        Assert.assertThrows(NullPointerException.class, () -> {
            this.windowedStream.aggregate((Initializer) null, MockAggregator.TOSTRING_ADDER);
        });
    }

    @Test
    public void shouldThrowNullPointerOnAggregateIfAggregatorIsNull() {
        Assert.assertThrows(NullPointerException.class, () -> {
            this.windowedStream.aggregate(MockInitializer.STRING_INIT, (Aggregator) null);
        });
    }

    @Test
    public void shouldThrowNullPointerOnReduceIfReducerIsNull() {
        Assert.assertThrows(NullPointerException.class, () -> {
            this.windowedStream.reduce((Reducer) null);
        });
    }

    @Test
    public void shouldThrowNullPointerOnMaterializedAggregateIfInitializerIsNull() {
        Assert.assertThrows(NullPointerException.class, () -> {
            this.windowedStream.aggregate((Initializer) null, MockAggregator.TOSTRING_ADDER, Materialized.as("store"));
        });
    }

    @Test
    public void shouldThrowNullPointerOnMaterializedAggregateIfAggregatorIsNull() {
        Assert.assertThrows(NullPointerException.class, () -> {
            this.windowedStream.aggregate(MockInitializer.STRING_INIT, (Aggregator) null, Materialized.as("store"));
        });
    }

    @Test
    public void shouldThrowNullPointerOnMaterializedAggregateIfMaterializedIsNull() {
        Assert.assertThrows(NullPointerException.class, () -> {
            this.windowedStream.aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, (Materialized) null);
        });
    }

    @Test
    public void shouldThrowNullPointerOnMaterializedReduceIfReducerIsNull() {
        Assert.assertThrows(NullPointerException.class, () -> {
            this.windowedStream.reduce((Reducer) null, Materialized.as("store"));
        });
    }

    @Test
    public void shouldThrowNullPointerOnMaterializedReduceIfMaterializedIsNull() {
        Assert.assertThrows(NullPointerException.class, () -> {
            this.windowedStream.reduce(MockReducer.STRING_ADDER, (Materialized) null);
        });
    }

    @Test
    public void shouldThrowNullPointerOnMaterializedReduceIfNamedIsNull() {
        Assert.assertThrows(NullPointerException.class, () -> {
            this.windowedStream.reduce(MockReducer.STRING_ADDER, (Named) null);
        });
    }

    @Test
    public void shouldThrowNullPointerOnCountIfMaterializedIsNull() {
        Assert.assertThrows(NullPointerException.class, () -> {
            this.windowedStream.count((Materialized) null);
        });
    }

    @Test
    public void shouldThrowIllegalArgumentWhenRetentionIsTooSmall() {
        Assert.assertThrows(IllegalArgumentException.class, () -> {
            this.windowedStream.aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, Materialized.as("aggregated").withKeySerde(Serdes.String()).withValueSerde(Serdes.String()).withRetention(Duration.ofMillis(1L)));
        });
    }

    @Test
    public void shouldDropWindowsOutsideOfRetention() {
        this.windowedStream.aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, Materialized.as(Stores.inMemoryWindowStore("aggregated", Duration.ofMillis(1200L), Duration.ofMillis(100L), false)).withKeySerde(Serdes.String()).withValueSerde(Serdes.String()).withCachingDisabled());
        TopologyTestDriver topologyTestDriver = new TopologyTestDriver(this.builder.build(), this.props);
        Throwable th = null;
        try {
            try {
                TestInputTopic createInputTopic = topologyTestDriver.createInputTopic(TOPIC, new StringSerializer(), new StringSerializer());
                createInputTopic.pipeInput("1", "2", 100L);
                createInputTopic.pipeInput("1", "3", 500L);
                createInputTopic.pipeInput("1", "4", 799L);
                createInputTopic.pipeInput("1", "4", 1000L);
                createInputTopic.pipeInput("1", "5", 2000L);
                MatcherAssert.assertThat(StreamsTestUtils.toList(topologyTestDriver.getWindowStore("aggregated").fetch("1", "1", Instant.ofEpochMilli(0L), Instant.ofEpochMilli(10000L))), CoreMatchers.equalTo(Arrays.asList(KeyValue.pair(new Windowed("1", new TimeWindow(900L, 1000L)), "0+4"), KeyValue.pair(new Windowed("1", new TimeWindow(1900L, 2000L)), "0+5"))));
                MatcherAssert.assertThat(StreamsTestUtils.toList(topologyTestDriver.getTimestampedWindowStore("aggregated").fetch("1", "1", Instant.ofEpochMilli(0L), Instant.ofEpochMilli(2000L))), CoreMatchers.equalTo(Arrays.asList(KeyValue.pair(new Windowed("1", new TimeWindow(900L, 1000L)), ValueAndTimestamp.make("0+4", 1000L)), KeyValue.pair(new Windowed("1", new TimeWindow(1900L, 2000L)), ValueAndTimestamp.make("0+5", 2000L)))));
                if (topologyTestDriver != null) {
                    if (0 == 0) {
                        topologyTestDriver.close();
                        return;
                    }
                    try {
                        topologyTestDriver.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (topologyTestDriver != null) {
                if (th != null) {
                    try {
                        topologyTestDriver.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    topologyTestDriver.close();
                }
            }
            throw th4;
        }
    }

    private void processData(TopologyTestDriver topologyTestDriver) {
        TestInputTopic createInputTopic = topologyTestDriver.createInputTopic(TOPIC, new StringSerializer(), new StringSerializer());
        createInputTopic.pipeInput("1", "1", 100L);
        createInputTopic.pipeInput("1", "2", 150L);
        createInputTopic.pipeInput("1", "3", 500L);
        createInputTopic.pipeInput("2", "10", 200L);
        createInputTopic.pipeInput("2", "20", 150L);
    }
}
