package org.apache.kafka.streams.processor.internals;

import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.Arrays;
import java.util.Base64;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.MockConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.clients.producer.MockProducer;
import org.apache.kafka.clients.producer.internals.DefaultPartitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.AuthorizationException;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.metrics.JmxReporter;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.DefaultProductionExceptionHandler;
import org.apache.kafka.streams.errors.ProcessorStateException;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.errors.TaskMigratedException;
import org.apache.kafka.streams.processor.PunctuationType;
import org.apache.kafka.streams.processor.Punctuator;
import org.apache.kafka.streams.processor.StateRestoreListener;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.StreamPartitioner;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.WallclockTimestampExtractor;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
import org.apache.kafka.streams.state.internals.ThreadCache;
import org.apache.kafka.test.MockKeyValueStore;
import org.apache.kafka.test.MockProcessorNode;
import org.apache.kafka.test.MockRecordCollector;
import org.apache.kafka.test.MockSourceNode;
import org.apache.kafka.test.MockStateRestoreListener;
import org.apache.kafka.test.MockTimestampExtractor;
import org.apache.kafka.test.TestUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/*  JADX ERROR: NullPointerException in pass: ClassModifier
    java.lang.NullPointerException: Cannot invoke "java.util.List.forEach(java.util.function.Consumer)" because "blocks" is null
    	at jadx.core.utils.BlockUtils.collectAllInsns(BlockUtils.java:1017)
    	at jadx.core.dex.visitors.ClassModifier.removeBridgeMethod(ClassModifier.java:239)
    	at jadx.core.dex.visitors.ClassModifier.removeSyntheticMethods(ClassModifier.java:154)
    	at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
    	at jadx.core.dex.visitors.ClassModifier.visit(ClassModifier.java:64)
    */
/* loaded from: input_file:org/apache/kafka/streams/processor/internals/StreamTaskTest.class */
public class StreamTaskTest {
    private static final File BASE_DIR = TestUtils.tempDirectory();
    private MockProducer<byte[], byte[]> producer;
    private StateDirectory stateDirectory;
    private StreamTask task;
    private long punctuatedAt;
    private static final String APPLICATION_ID = "stream-task-test";
    private static final long DEFAULT_TIMESTAMP = 1000;
    private final Serializer<Integer> intSerializer = Serdes.Integer().serializer();
    private final Serializer<byte[]> bytesSerializer = Serdes.ByteArray().serializer();
    private final Deserializer<Integer> intDeserializer = Serdes.Integer().deserializer();
    private final String topic1 = "topic1";
    private final String topic2 = "topic2";
    private final TopicPartition partition1 = new TopicPartition("topic1", 1);
    private final TopicPartition partition2 = new TopicPartition("topic2", 1);
    private final Set<TopicPartition> partitions = Utils.mkSet(new TopicPartition[]{this.partition1, this.partition2});
    private final MockSourceNode<Integer, Integer> source1 = new MockSourceNode<>(new String[]{"topic1"}, this.intDeserializer, this.intDeserializer);
    private final MockSourceNode<Integer, Integer> source2 = new MockSourceNode<>(new String[]{"topic2"}, this.intDeserializer, this.intDeserializer);
    private final MockSourceNode<Integer, Integer> source3 = new MockSourceNode<Integer, Integer>(new String[]{"topic2"}, this.intDeserializer, this.intDeserializer) { // from class: org.apache.kafka.streams.processor.internals.StreamTaskTest.1
        @Override // org.apache.kafka.test.MockSourceNode
        public void process(Integer num, Integer num2) {
            throw new RuntimeException("KABOOM!");
        }

        @Override // org.apache.kafka.test.MockSourceNode
        public void close() {
            throw new RuntimeException("KABOOM!");
        }
    };
    private final MockProcessorNode<Integer, Integer> processorStreamTime = new MockProcessorNode<>(10);
    private final MockProcessorNode<Integer, Integer> processorSystemTime = new MockProcessorNode<>(10, PunctuationType.WALL_CLOCK_TIME);
    private final String storeName = "store";
    private final StateStore stateStore = new MockKeyValueStore("store", false);
    private final TopicPartition changelogPartition = new TopicPartition("store-changelog", 0);
    private final Long offset = 543L;
    private final ProcessorTopology topology = withSources(Arrays.asList(this.source1, this.source2, this.processorStreamTime, this.processorSystemTime), Utils.mkMap(new Map.Entry[]{Utils.mkEntry("topic1", this.source1), Utils.mkEntry("topic2", this.source2)}));
    private final MockConsumer<byte[], byte[]> consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
    private final MockConsumer<byte[], byte[]> restoreStateConsumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
    private final StateRestoreListener stateRestoreListener = new MockStateRestoreListener();
    private final StoreChangelogReader changelogReader = new StoreChangelogReader(this.restoreStateConsumer, Duration.ZERO, this.stateRestoreListener, new LogContext("stream-task-test ")) { // from class: org.apache.kafka.streams.processor.internals.StreamTaskTest.2
        public Map<TopicPartition, Long> restoredOffsets() {
            return Collections.singletonMap(StreamTaskTest.this.changelogPartition, StreamTaskTest.this.offset);
        }
    };
    private final byte[] recordValue = this.intSerializer.serialize((String) null, 10);
    private final byte[] recordKey = this.intSerializer.serialize((String) null, 1);
    private final Metrics metrics = new Metrics(new MetricConfig().recordLevel(Sensor.RecordingLevel.DEBUG));
    private final StreamsMetricsImpl streamsMetrics = new MockStreamsMetrics(this.metrics);
    private final TaskId taskId00 = new TaskId(0, 0);
    private final MockTime time = new MockTime();
    private final Punctuator punctuator = new Punctuator() { // from class: org.apache.kafka.streams.processor.internals.StreamTaskTest.3
        /*  JADX ERROR: JadxRuntimeException in pass: InlineMethods
            jadx.core.utils.exceptions.JadxRuntimeException: Failed to process method for inline: org.apache.kafka.streams.processor.internals.StreamTaskTest.access$202(org.apache.kafka.streams.processor.internals.StreamTaskTest, long):long
            	at jadx.core.dex.visitors.InlineMethods.processInvokeInsn(InlineMethods.java:74)
            	at jadx.core.dex.visitors.InlineMethods.visit(InlineMethods.java:49)
            Caused by: jadx.core.utils.exceptions.JadxRuntimeException: Class not yet loaded at codegen stage: org.apache.kafka.streams.processor.internals.StreamTaskTest
            	at jadx.core.dex.nodes.ClassNode.reloadAtCodegenStage(ClassNode.java:883)
            	at jadx.core.dex.visitors.InlineMethods.processInvokeInsn(InlineMethods.java:66)
            	... 1 more
            */
        public void punctuate(long r5) {
            /*
                r4 = this;
                r0 = r4
                org.apache.kafka.streams.processor.internals.StreamTaskTest r0 = org.apache.kafka.streams.processor.internals.StreamTaskTest.this
                r1 = r5
                long r0 = org.apache.kafka.streams.processor.internals.StreamTaskTest.access$202(r0, r1)
                return
            */
            throw new UnsupportedOperationException("Method not decompiled: org.apache.kafka.streams.processor.internals.StreamTaskTest.AnonymousClass3.punctuate(long):void");
        }
    };

    public StreamTaskTest() {
    }

    private static ProcessorTopology withRepartitionTopics(List<ProcessorNode> list, Map<String, SourceNode> map, Set<String> set) {
        return new ProcessorTopology(list, map, Collections.emptyMap(), Collections.emptyList(), Collections.emptyList(), Collections.emptyMap(), set);
    }

    private static ProcessorTopology withSources(List<ProcessorNode> list, Map<String, SourceNode> map) {
        return new ProcessorTopology(list, map, Collections.emptyMap(), Collections.emptyList(), Collections.emptyList(), Collections.emptyMap(), Collections.emptySet());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static StreamsConfig createConfig(boolean z) {
        try {
            String canonicalPath = BASE_DIR.getCanonicalPath();
            Map.Entry[] entryArr = new Map.Entry[7];
            entryArr[0] = Utils.mkEntry("application.id", APPLICATION_ID);
            entryArr[1] = Utils.mkEntry("bootstrap.servers", "localhost:2171");
            entryArr[2] = Utils.mkEntry("buffered.records.per.partition", "3");
            entryArr[3] = Utils.mkEntry("state.dir", canonicalPath);
            entryArr[4] = Utils.mkEntry("default.timestamp.extractor", MockTimestampExtractor.class.getName());
            entryArr[5] = Utils.mkEntry("processing.guarantee", z ? "exactly_once" : "at_least_once");
            entryArr[6] = Utils.mkEntry("max.task.idle.ms", "100");
            return new StreamsConfig(Utils.mkProperties(Utils.mkMap(entryArr)));
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    @Before
    public void setup() {
        this.consumer.assign(Arrays.asList(this.partition1, this.partition2));
        this.consumer.updateBeginningOffsets(Utils.mkMap(new Map.Entry[]{Utils.mkEntry(this.partition1, 0L), Utils.mkEntry(this.partition2, 0L)}));
        this.stateDirectory = new StateDirectory(createConfig(false), new MockTime(), true);
    }

    @After
    public void cleanup() throws IOException {
        try {
            if (this.task != null) {
                try {
                    this.task.close(true, false);
                } catch (Exception e) {
                }
            }
            Utils.delete(BASE_DIR);
        } catch (Throwable th) {
            Utils.delete(BASE_DIR);
            throw th;
        }
    }

    @Test
    public void shouldHandleInitTransactionsTimeoutExceptionOnCreation() {
        LogCaptureAppender createAndRegister = LogCaptureAppender.createAndRegister();
        ProcessorTopology withSources = withSources(Arrays.asList(this.source1, this.source2, this.processorStreamTime, this.processorSystemTime), Utils.mkMap(new Map.Entry[]{Utils.mkEntry("topic1", this.source1), Utils.mkEntry("topic2", this.source2)}));
        this.source1.addChild(this.processorStreamTime);
        this.source2.addChild(this.processorStreamTime);
        this.source1.addChild(this.processorSystemTime);
        this.source2.addChild(this.processorSystemTime);
        try {
            new StreamTask(this.taskId00, this.partitions, withSources, this.consumer, this.changelogReader, createConfig(true), this.streamsMetrics, this.stateDirectory, (ThreadCache) null, this.time, () -> {
                MockProducer<byte[], byte[]> mockProducer = new MockProducer<byte[], byte[]>(false, this.bytesSerializer, this.bytesSerializer) { // from class: org.apache.kafka.streams.processor.internals.StreamTaskTest.4
                    public void initTransactions() {
                        throw new TimeoutException("test");
                    }
                };
                this.producer = mockProducer;
                return mockProducer;
            }, (RecordCollector) null);
            Assert.fail("Expected an exception");
        } catch (StreamsException e) {
            assertTimeoutErrorLog(createAndRegister);
            MatcherAssert.assertThat(e.getMessage(), CoreMatchers.is("stream-thread [" + Thread.currentThread().getName() + "] task [0_0] Failed to initialize task 0_0 due to timeout."));
            Assert.assertEquals(e.getCause().getClass(), TimeoutException.class);
            MatcherAssert.assertThat(e.getCause().getMessage(), CoreMatchers.is("test"));
        }
        LogCaptureAppender.unregister(createAndRegister);
    }

    @Test
    public void shouldHandleInitTransactionsTimeoutExceptionOnResume() {
        LogCaptureAppender createAndRegister = LogCaptureAppender.createAndRegister();
        ProcessorTopology withSources = withSources(Arrays.asList(this.source1, this.source2, this.processorStreamTime, this.processorSystemTime), Utils.mkMap(new Map.Entry[]{Utils.mkEntry("topic1", this.source1), Utils.mkEntry("topic2", this.source2)}));
        this.source1.addChild(this.processorStreamTime);
        this.source2.addChild(this.processorStreamTime);
        this.source1.addChild(this.processorSystemTime);
        this.source2.addChild(this.processorSystemTime);
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        StreamTask streamTask = new StreamTask(this.taskId00, this.partitions, withSources, this.consumer, this.changelogReader, createConfig(true), this.streamsMetrics, this.stateDirectory, (ThreadCache) null, this.time, () -> {
            MockProducer<byte[], byte[]> mockProducer = new MockProducer<byte[], byte[]>(false, this.bytesSerializer, this.bytesSerializer) { // from class: org.apache.kafka.streams.processor.internals.StreamTaskTest.5
                public void initTransactions() {
                    if (atomicBoolean.get()) {
                        throw new TimeoutException("test");
                    }
                    super.initTransactions();
                }
            };
            this.producer = mockProducer;
            return mockProducer;
        }, (RecordCollector) null);
        streamTask.initializeTopology();
        streamTask.suspend();
        atomicBoolean.set(true);
        try {
            streamTask.resume();
            Assert.fail("Expected an exception");
        } catch (StreamsException e) {
            assertTimeoutErrorLog(createAndRegister);
            MatcherAssert.assertThat(e.getMessage(), CoreMatchers.is("stream-thread [" + Thread.currentThread().getName() + "] task [0_0] Failed to initialize task 0_0 due to timeout."));
            Assert.assertEquals(e.getCause().getClass(), TimeoutException.class);
            MatcherAssert.assertThat(e.getCause().getMessage(), CoreMatchers.is("test"));
        }
        LogCaptureAppender.unregister(createAndRegister);
    }

    private void assertTimeoutErrorLog(LogCaptureAppender logCaptureAppender) {
        String str = "stream-thread [" + Thread.currentThread().getName() + "] task [0_0] Timeout exception caught when initializing transactions for task 0_0. This might happen if the broker is slow to respond, if the network connection to the broker was interrupted, or if similar circumstances arise. You can increase producer parameter `max.block.ms` to increase this timeout.";
        MatcherAssert.assertThat((List) logCaptureAppender.getEvents().stream().filter(event -> {
            return event.getMessage().equals(str);
        }).map((v0) -> {
            return v0.getLevel();
        }).collect(Collectors.toList()), CoreMatchers.is(Collections.singletonList("ERROR")));
    }

    @Test
    public void testProcessOrder() {
        this.task = createStatelessTask(createConfig(false), "latest");
        this.task.addRecords(this.partition1, Arrays.asList(getConsumerRecord(this.partition1, 10L), getConsumerRecord(this.partition1, 20L), getConsumerRecord(this.partition1, 30L)));
        this.task.addRecords(this.partition2, Arrays.asList(getConsumerRecord(this.partition2, 25L), getConsumerRecord(this.partition2, 35L), getConsumerRecord(this.partition2, 45L)));
        Assert.assertTrue(this.task.process());
        Assert.assertEquals(5L, this.task.numBuffered());
        Assert.assertEquals(1L, this.source1.numReceived);
        Assert.assertEquals(0L, this.source2.numReceived);
        Assert.assertTrue(this.task.process());
        Assert.assertEquals(4L, this.task.numBuffered());
        Assert.assertEquals(2L, this.source1.numReceived);
        Assert.assertEquals(0L, this.source2.numReceived);
        Assert.assertTrue(this.task.process());
        Assert.assertEquals(3L, this.task.numBuffered());
        Assert.assertEquals(2L, this.source1.numReceived);
        Assert.assertEquals(1L, this.source2.numReceived);
        Assert.assertTrue(this.task.process());
        Assert.assertEquals(2L, this.task.numBuffered());
        Assert.assertEquals(3L, this.source1.numReceived);
        Assert.assertEquals(1L, this.source2.numReceived);
        Assert.assertTrue(this.task.process());
        Assert.assertEquals(1L, this.task.numBuffered());
        Assert.assertEquals(3L, this.source1.numReceived);
        Assert.assertEquals(2L, this.source2.numReceived);
        Assert.assertTrue(this.task.process());
        Assert.assertEquals(0L, this.task.numBuffered());
        Assert.assertEquals(3L, this.source1.numReceived);
        Assert.assertEquals(3L, this.source2.numReceived);
    }

    @Test
    public void testMetricsWithBuiltInMetricsVersion0100To24() {
        testMetrics("0.10.0-2.4");
    }

    @Test
    public void testMetricsWithBuiltInMetricsVersionLatest() {
        testMetrics("latest");
    }

    private void testMetrics(String str) {
        this.task = createStatelessTask(createConfig(false), str);
        Assert.assertNotNull(getMetric("commit", "%s-latency-avg", this.task.id().toString(), str));
        Assert.assertNotNull(getMetric("commit", "%s-latency-max", this.task.id().toString(), str));
        Assert.assertNotNull(getMetric("commit", "%s-rate", this.task.id().toString(), str));
        Assert.assertNotNull(getMetric("commit", "%s-total", this.task.id().toString(), str));
        Assert.assertNotNull(getMetric("enforced-processing", "%s-rate", this.task.id().toString(), str));
        Assert.assertNotNull(getMetric("enforced-processing", "%s-total", this.task.id().toString(), str));
        Assert.assertNotNull(getMetric("record-lateness", "%s-avg", this.task.id().toString(), str));
        Assert.assertNotNull(getMetric("record-lateness", "%s-max", this.task.id().toString(), str));
        if ("0.10.0-2.4".equals(str)) {
            testMetricsForBuiltInMetricsVersion0100To24();
        } else {
            testMetricsForBuiltInMetricsVersionLatest();
        }
        String name = Thread.currentThread().getName();
        JmxReporter jmxReporter = new JmxReporter("kafka.streams");
        this.metrics.addReporter(jmxReporter);
        String str2 = "latest".equals(str) ? "thread-id" : "client-id";
        Assert.assertTrue(jmxReporter.containsMbean(String.format("kafka.streams:type=stream-task-metrics,%s=%s,task-id=%s", str2, name, this.task.id.toString())));
        if ("0.10.0-2.4".equals(str)) {
            Assert.assertTrue(jmxReporter.containsMbean(String.format("kafka.streams:type=stream-task-metrics,%s=%s,task-id=all", str2, name)));
        }
    }

    private void testMetricsForBuiltInMetricsVersionLatest() {
        Assert.assertNull(getMetric("commit", "%s-latency-avg", "all", "latest"));
        Assert.assertNull(getMetric("commit", "%s-latency-max", "all", "latest"));
        Assert.assertNull(getMetric("commit", "%s-rate", "all", "latest"));
        Assert.assertNull(getMetric("commit", "%s-total", "all", "latest"));
        Assert.assertNotNull(getMetric("process", "%s-latency-avg", this.task.id().toString(), "latest"));
        Assert.assertNotNull(getMetric("process", "%s-latency-max", this.task.id().toString(), "latest"));
        Assert.assertNotNull(getMetric("punctuate", "%s-latency-avg", this.task.id().toString(), "latest"));
        Assert.assertNotNull(getMetric("punctuate", "%s-latency-max", this.task.id().toString(), "latest"));
        Assert.assertNotNull(getMetric("punctuate", "%s-rate", this.task.id().toString(), "latest"));
        Assert.assertNotNull(getMetric("punctuate", "%s-total", this.task.id().toString(), "latest"));
    }

    private void testMetricsForBuiltInMetricsVersion0100To24() {
        Assert.assertNotNull(getMetric("commit", "%s-latency-avg", "all", "0.10.0-2.4"));
        Assert.assertNotNull(getMetric("commit", "%s-latency-max", "all", "0.10.0-2.4"));
        Assert.assertNotNull(getMetric("commit", "%s-rate", "all", "0.10.0-2.4"));
        Assert.assertNull(getMetric("process", "%s-latency-avg", this.task.id().toString(), "0.10.0-2.4"));
        Assert.assertNull(getMetric("process", "%s-latency-max", this.task.id().toString(), "0.10.0-2.4"));
        Assert.assertNull(getMetric("punctuate", "%s-latency-avg", this.task.id().toString(), "0.10.0-2.4"));
        Assert.assertNull(getMetric("punctuate", "%s-latency-max", this.task.id().toString(), "0.10.0-2.4"));
        Assert.assertNull(getMetric("punctuate", "%s-rate", this.task.id().toString(), "0.10.0-2.4"));
        Assert.assertNull(getMetric("punctuate", "%s-total", this.task.id().toString(), "0.10.0-2.4"));
    }

    private KafkaMetric getMetric(String str, String str2, String str3, String str4) {
        Map metrics = this.metrics.metrics();
        Metrics metrics2 = this.metrics;
        String format = String.format(str2, str);
        Map.Entry[] entryArr = new Map.Entry[2];
        entryArr[0] = Utils.mkEntry("task-id", str3);
        entryArr[1] = Utils.mkEntry("latest".equals(str4) ? "thread-id" : "client-id", Thread.currentThread().getName());
        return (KafkaMetric) metrics.get(metrics2.metricName(format, "stream-task-metrics", "", Utils.mkMap(entryArr)));
    }

    @Test
    public void testPauseResume() {
        this.task = createStatelessTask(createConfig(false), "latest");
        this.task.addRecords(this.partition1, Arrays.asList(getConsumerRecord(this.partition1, 10L), getConsumerRecord(this.partition1, 20L)));
        this.task.addRecords(this.partition2, Arrays.asList(getConsumerRecord(this.partition2, 35L), getConsumerRecord(this.partition2, 45L), getConsumerRecord(this.partition2, 55L), getConsumerRecord(this.partition2, 65L)));
        Assert.assertTrue(this.task.process());
        Assert.assertEquals(1L, this.source1.numReceived);
        Assert.assertEquals(0L, this.source2.numReceived);
        Assert.assertEquals(1L, this.consumer.paused().size());
        Assert.assertTrue(this.consumer.paused().contains(this.partition2));
        this.task.addRecords(this.partition1, Arrays.asList(getConsumerRecord(this.partition1, 30L), getConsumerRecord(this.partition1, 40L), getConsumerRecord(this.partition1, 50L)));
        Assert.assertEquals(2L, this.consumer.paused().size());
        Assert.assertTrue(this.consumer.paused().contains(this.partition1));
        Assert.assertTrue(this.consumer.paused().contains(this.partition2));
        Assert.assertTrue(this.task.process());
        Assert.assertEquals(2L, this.source1.numReceived);
        Assert.assertEquals(0L, this.source2.numReceived);
        Assert.assertEquals(1L, this.consumer.paused().size());
        Assert.assertTrue(this.consumer.paused().contains(this.partition2));
        Assert.assertTrue(this.task.process());
        Assert.assertEquals(3L, this.source1.numReceived);
        Assert.assertEquals(0L, this.source2.numReceived);
        Assert.assertEquals(1L, this.consumer.paused().size());
        Assert.assertTrue(this.consumer.paused().contains(this.partition2));
        Assert.assertTrue(this.task.process());
        Assert.assertEquals(3L, this.source1.numReceived);
        Assert.assertEquals(1L, this.source2.numReceived);
        Assert.assertEquals(0L, this.consumer.paused().size());
    }

    @Test
    public void shouldPunctuateOnceStreamTimeAfterGap() {
        this.task = createStatelessTask(createConfig(false), "latest");
        this.task.initializeStateStores();
        this.task.initializeTopology();
        this.task.addRecords(this.partition1, Arrays.asList(getConsumerRecord(this.partition1, 20L), getConsumerRecord(this.partition1, 142L), getConsumerRecord(this.partition1, 155L), getConsumerRecord(this.partition1, 160L)));
        this.task.addRecords(this.partition2, Arrays.asList(getConsumerRecord(this.partition2, 25L), getConsumerRecord(this.partition2, 145L), getConsumerRecord(this.partition2, 159L), getConsumerRecord(this.partition2, 161L)));
        Assert.assertFalse(this.task.maybePunctuateStreamTime());
        Assert.assertTrue(this.task.process());
        Assert.assertEquals(7L, this.task.numBuffered());
        Assert.assertEquals(1L, this.source1.numReceived);
        Assert.assertEquals(0L, this.source2.numReceived);
        Assert.assertTrue(this.task.maybePunctuateStreamTime());
        Assert.assertTrue(this.task.process());
        Assert.assertEquals(6L, this.task.numBuffered());
        Assert.assertEquals(1L, this.source1.numReceived);
        Assert.assertEquals(1L, this.source2.numReceived);
        Assert.assertFalse(this.task.maybePunctuateStreamTime());
        Assert.assertTrue(this.task.process());
        Assert.assertEquals(5L, this.task.numBuffered());
        Assert.assertEquals(2L, this.source1.numReceived);
        Assert.assertEquals(1L, this.source2.numReceived);
        Assert.assertTrue(this.task.maybePunctuateStreamTime());
        Assert.assertTrue(this.task.process());
        Assert.assertEquals(4L, this.task.numBuffered());
        Assert.assertEquals(2L, this.source1.numReceived);
        Assert.assertEquals(2L, this.source2.numReceived);
        Assert.assertFalse(this.task.maybePunctuateStreamTime());
        Assert.assertTrue(this.task.process());
        Assert.assertEquals(3L, this.task.numBuffered());
        Assert.assertEquals(3L, this.source1.numReceived);
        Assert.assertEquals(2L, this.source2.numReceived);
        Assert.assertTrue(this.task.maybePunctuateStreamTime());
        Assert.assertTrue(this.task.process());
        Assert.assertEquals(2L, this.task.numBuffered());
        Assert.assertEquals(3L, this.source1.numReceived);
        Assert.assertEquals(3L, this.source2.numReceived);
        Assert.assertFalse(this.task.maybePunctuateStreamTime());
        Assert.assertTrue(this.task.process());
        Assert.assertEquals(1L, this.task.numBuffered());
        Assert.assertEquals(4L, this.source1.numReceived);
        Assert.assertEquals(3L, this.source2.numReceived);
        Assert.assertTrue(this.task.maybePunctuateStreamTime());
        Assert.assertTrue(this.task.process());
        Assert.assertEquals(0L, this.task.numBuffered());
        Assert.assertEquals(4L, this.source1.numReceived);
        Assert.assertEquals(4L, this.source2.numReceived);
        Assert.assertFalse(this.task.maybePunctuateStreamTime());
        this.processorStreamTime.mockProcessor.checkAndClearPunctuateResult(PunctuationType.STREAM_TIME, 20, 142, 155, 160);
    }

    @Test
    public void shouldRespectPunctuateCancellationStreamTime() {
        this.task = createStatelessTask(createConfig(false), "latest");
        this.task.initializeStateStores();
        this.task.initializeTopology();
        this.task.addRecords(this.partition1, Arrays.asList(getConsumerRecord(this.partition1, 20L), getConsumerRecord(this.partition1, 30L), getConsumerRecord(this.partition1, 40L)));
        this.task.addRecords(this.partition2, Arrays.asList(getConsumerRecord(this.partition2, 25L), getConsumerRecord(this.partition2, 35L), getConsumerRecord(this.partition2, 45L)));
        Assert.assertFalse(this.task.maybePunctuateStreamTime());
        Assert.assertTrue(this.task.process());
        Assert.assertTrue(this.task.maybePunctuateStreamTime());
        Assert.assertTrue(this.task.process());
        Assert.assertFalse(this.task.maybePunctuateStreamTime());
        Assert.assertTrue(this.task.process());
        this.processorStreamTime.mockProcessor.scheduleCancellable.cancel();
        Assert.assertFalse(this.task.maybePunctuateStreamTime());
        this.processorStreamTime.mockProcessor.checkAndClearPunctuateResult(PunctuationType.STREAM_TIME, 20);
    }

    @Test
    public void shouldRespectPunctuateCancellationSystemTime() {
        this.task = createStatelessTask(createConfig(false), "latest");
        this.task.initializeStateStores();
        this.task.initializeTopology();
        long milliseconds = this.time.milliseconds();
        this.time.sleep(10L);
        Assert.assertTrue(this.task.maybePunctuateSystemTime());
        this.processorSystemTime.mockProcessor.scheduleCancellable.cancel();
        this.time.sleep(10L);
        Assert.assertFalse(this.task.maybePunctuateSystemTime());
        this.processorSystemTime.mockProcessor.checkAndClearPunctuateResult(PunctuationType.WALL_CLOCK_TIME, milliseconds + 10);
    }

    @Test
    public void shouldRespectCommitNeeded() {
        this.task = createStatelessTask(createConfig(false), "latest");
        this.task.initializeStateStores();
        this.task.initializeTopology();
        Assert.assertFalse(this.task.commitNeeded());
        this.task.addRecords(this.partition1, Collections.singletonList(getConsumerRecord(this.partition1, 0L)));
        Assert.assertTrue(this.task.process());
        Assert.assertTrue(this.task.commitNeeded());
        this.task.commit();
        Assert.assertFalse(this.task.commitNeeded());
        Assert.assertTrue(this.task.maybePunctuateStreamTime());
        Assert.assertTrue(this.task.commitNeeded());
        this.task.commit();
        Assert.assertFalse(this.task.commitNeeded());
        this.time.sleep(10L);
        Assert.assertTrue(this.task.maybePunctuateSystemTime());
        Assert.assertTrue(this.task.commitNeeded());
        this.task.commit();
        Assert.assertFalse(this.task.commitNeeded());
    }

    @Test
    public void shouldCommitNextOffsetFromQueueIfAvailable() {
        this.task = createStatelessTask(createConfig(false), "latest");
        this.task.initializeStateStores();
        this.task.initializeTopology();
        this.task.addRecords(this.partition1, Arrays.asList(getConsumerRecord(this.partition1, 0L), getConsumerRecord(this.partition1, 5L)));
        this.task.process();
        this.task.commit();
        MatcherAssert.assertThat(getCommittetOffsets(this.consumer.committed(this.partitions)), CoreMatchers.equalTo(Utils.mkMap(new Map.Entry[]{Utils.mkEntry(this.partition1, 5L)})));
    }

    @Test
    public void shouldCommitConsumerPositionIfRecordQueueIsEmpty() {
        this.task = createStatelessTask(createConfig(false), "latest");
        this.task.initializeStateStores();
        this.task.initializeTopology();
        this.consumer.addRecord(getConsumerRecord(this.partition1, 0L));
        this.consumer.addRecord(getConsumerRecord(this.partition1, 1L));
        this.consumer.addRecord(getConsumerRecord(this.partition1, 2L));
        this.consumer.poll(Duration.ZERO);
        this.task.addRecords(this.partition1, Collections.singletonList(getConsumerRecord(this.partition1, 0L)));
        this.task.process();
        this.task.commit();
        MatcherAssert.assertThat(getCommittetOffsets(this.consumer.committed(this.partitions)), CoreMatchers.equalTo(Utils.mkMap(new Map.Entry[]{Utils.mkEntry(this.partition1, 3L)})));
    }

    private Map<TopicPartition, Long> getCommittetOffsets(Map<TopicPartition, OffsetAndMetadata> map) {
        HashMap hashMap = new HashMap();
        for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : map.entrySet()) {
            hashMap.put(entry.getKey(), Long.valueOf(entry.getValue().offset()));
        }
        return hashMap;
    }

    @Test
    public void shouldRestorePartitionTimeAfterRestartWithEosDisabled() {
        createTaskWithProcessAndCommit(false);
        Assert.assertEquals(DEFAULT_TIMESTAMP, this.task.decodeTimestamp(((OffsetAndMetadata) this.consumer.committed(Collections.singleton(this.partition1)).get(this.partition1)).metadata()));
        this.task = createStatelessTask(createConfig(false), "latest");
        this.task.initializeMetadata();
        Assert.assertEquals(DEFAULT_TIMESTAMP, this.task.partitionTime(this.partition1));
        Assert.assertEquals(DEFAULT_TIMESTAMP, this.task.streamTime());
    }

    @Test
    public void shouldRestorePartitionTimeAfterRestartWithEosEnabled() {
        createTaskWithProcessAndCommit(true);
        moveCommittedOffsetsFromProducerToConsumer(DEFAULT_TIMESTAMP);
        this.task = createStatelessTask(createConfig(true), "latest");
        this.task.initializeMetadata();
        Assert.assertEquals(DEFAULT_TIMESTAMP, this.task.partitionTime(this.partition1));
        Assert.assertEquals(DEFAULT_TIMESTAMP, this.task.streamTime());
    }

    private void createTaskWithProcessAndCommit(boolean z) {
        this.task = createStatelessTask(createConfig(z), "latest");
        this.task.initializeStateStores();
        this.task.initializeTopology();
        this.task.addRecords(this.partition1, Collections.singletonList(getConsumerRecord(this.partition1, DEFAULT_TIMESTAMP)));
        this.task.process();
        this.task.commit();
    }

    @Test
    public void shouldEncodeAndDecodeMetadata() {
        this.task = createStatelessTask(createConfig(false), "latest");
        Assert.assertEquals(DEFAULT_TIMESTAMP, this.task.decodeTimestamp(this.task.encodeTimestamp(DEFAULT_TIMESTAMP)));
    }

    @Test
    public void shouldReturnUnknownTimestampIfUnknownVersion() {
        this.task = createStatelessTask(createConfig(false), "latest");
        Assert.assertEquals(-1L, this.task.decodeTimestamp(Base64.getEncoder().encodeToString(new byte[]{2})));
    }

    @Test
    public void shouldReturnUnknownTimestampIfEmptyMessage() {
        this.task = createStatelessTask(createConfig(false), "latest");
        Assert.assertEquals(-1L, this.task.decodeTimestamp(""));
    }

    @Test
    public void shouldRespectCommitRequested() {
        this.task = createStatelessTask(createConfig(false), "latest");
        this.task.initializeStateStores();
        this.task.initializeTopology();
        this.task.requestCommit();
        Assert.assertTrue(this.task.commitRequested());
    }

    @Test
    public void shouldBeProcessableIfAllPartitionsBuffered() {
        this.task = createStatelessTask(createConfig(false), "latest");
        this.task.initializeStateStores();
        this.task.initializeTopology();
        Assert.assertFalse(this.task.isProcessable(0L));
        byte[] array = ByteBuffer.allocate(4).putInt(1).array();
        this.task.addRecords(this.partition1, Collections.singleton(new ConsumerRecord("topic1", 1, 0L, array, array)));
        Assert.assertFalse(this.task.isProcessable(0L));
        this.task.addRecords(this.partition2, Collections.singleton(new ConsumerRecord("topic2", 1, 0L, array, array)));
        Assert.assertTrue(this.task.isProcessable(0L));
    }

    @Test
    public void shouldBeProcessableIfWaitedForTooLong() {
        this.task = createStatelessTask(createConfig(false), "latest");
        this.task.initializeStateStores();
        this.task.initializeTopology();
        MetricName metricName = this.metrics.metricName("enforced-processing-total", "stream-task-metrics", Utils.mkMap(new Map.Entry[]{Utils.mkEntry("thread-id", Thread.currentThread().getName()), Utils.mkEntry("task-id", this.taskId00.toString())}));
        Assert.assertFalse(this.task.isProcessable(0L));
        Assert.assertEquals(Double.valueOf(0.0d), this.metrics.metric(metricName).metricValue());
        byte[] array = ByteBuffer.allocate(4).putInt(1).array();
        this.task.addRecords(this.partition1, Collections.singleton(new ConsumerRecord("topic1", 1, 0L, array, array)));
        Assert.assertFalse(this.task.isProcessable(this.time.milliseconds()));
        Assert.assertFalse(this.task.isProcessable(this.time.milliseconds() + 99));
        Assert.assertTrue(this.task.isProcessable(this.time.milliseconds() + 100));
        Assert.assertEquals(Double.valueOf(1.0d), this.metrics.metric(metricName).metricValue());
        Assert.assertTrue(this.task.isProcessable(this.time.milliseconds() + 101));
        Assert.assertEquals(Double.valueOf(2.0d), this.metrics.metric(metricName).metricValue());
        this.task.addRecords(this.partition2, Collections.singleton(new ConsumerRecord("topic2", 1, 0L, array, array)));
        Assert.assertTrue(this.task.isProcessable(this.time.milliseconds() + 130));
        Assert.assertEquals(Double.valueOf(2.0d), this.metrics.metric(metricName).metricValue());
        this.task.process();
        Assert.assertFalse(this.task.isProcessable(this.time.milliseconds() + 150));
        Assert.assertEquals(Double.valueOf(2.0d), this.metrics.metric(metricName).metricValue());
        Assert.assertFalse(this.task.isProcessable(this.time.milliseconds() + 249));
        Assert.assertEquals(Double.valueOf(2.0d), this.metrics.metric(metricName).metricValue());
        Assert.assertTrue(this.task.isProcessable(this.time.milliseconds() + 250));
        Assert.assertEquals(Double.valueOf(3.0d), this.metrics.metric(metricName).metricValue());
    }

    @Test
    public void shouldNotBeProcessableIfNoDataAvailble() {
        this.task = createStatelessTask(createConfig(false), "latest");
        this.task.initializeStateStores();
        this.task.initializeTopology();
        MetricName metricName = this.metrics.metricName("enforced-processing-total", "stream-task-metrics", Utils.mkMap(new Map.Entry[]{Utils.mkEntry("thread-id", Thread.currentThread().getName()), Utils.mkEntry("task-id", this.taskId00.toString())}));
        Assert.assertFalse(this.task.isProcessable(0L));
        Assert.assertEquals(Double.valueOf(0.0d), this.metrics.metric(metricName).metricValue());
        byte[] array = ByteBuffer.allocate(4).putInt(1).array();
        this.task.addRecords(this.partition1, Collections.singleton(new ConsumerRecord("topic1", 1, 0L, array, array)));
        Assert.assertFalse(this.task.isProcessable(this.time.milliseconds()));
        Assert.assertFalse(this.task.isProcessable(this.time.milliseconds() + 99));
        Assert.assertTrue(this.task.isProcessable(this.time.milliseconds() + 100));
        Assert.assertEquals(Double.valueOf(1.0d), this.metrics.metric(metricName).metricValue());
        this.task.process();
        Assert.assertFalse(this.task.isProcessable(this.time.milliseconds() + 110));
        Assert.assertEquals(Double.valueOf(1.0d), this.metrics.metric(metricName).metricValue());
        this.task.addRecords(this.partition1, Collections.singleton(new ConsumerRecord("topic1", 1, 0L, array, array)));
        Assert.assertFalse(this.task.isProcessable(this.time.milliseconds() + 150));
        Assert.assertEquals(Double.valueOf(1.0d), this.metrics.metric(metricName).metricValue());
        Assert.assertFalse(this.task.isProcessable(this.time.milliseconds() + 249));
        Assert.assertEquals(Double.valueOf(1.0d), this.metrics.metric(metricName).metricValue());
        Assert.assertTrue(this.task.isProcessable(this.time.milliseconds() + 250));
        Assert.assertEquals(Double.valueOf(2.0d), this.metrics.metric(metricName).metricValue());
    }

    @Test
    public void shouldPunctuateSystemTimeWhenIntervalElapsed() {
        this.task = createStatelessTask(createConfig(false), "latest");
        this.task.initializeStateStores();
        this.task.initializeTopology();
        long milliseconds = this.time.milliseconds();
        this.time.sleep(10L);
        Assert.assertTrue(this.task.maybePunctuateSystemTime());
        this.time.sleep(10L);
        Assert.assertTrue(this.task.maybePunctuateSystemTime());
        this.time.sleep(9L);
        Assert.assertFalse(this.task.maybePunctuateSystemTime());
        this.time.sleep(1L);
        Assert.assertTrue(this.task.maybePunctuateSystemTime());
        this.time.sleep(20L);
        Assert.assertTrue(this.task.maybePunctuateSystemTime());
        Assert.assertFalse(this.task.maybePunctuateSystemTime());
        this.processorSystemTime.mockProcessor.checkAndClearPunctuateResult(PunctuationType.WALL_CLOCK_TIME, milliseconds + 10, milliseconds + 20, milliseconds + 30, milliseconds + 50);
    }

    @Test
    public void shouldNotPunctuateSystemTimeWhenIntervalNotElapsed() {
        this.task = createStatelessTask(createConfig(false), "latest");
        this.task.initializeStateStores();
        this.task.initializeTopology();
        Assert.assertFalse(this.task.maybePunctuateSystemTime());
        this.time.sleep(9L);
        Assert.assertFalse(this.task.maybePunctuateSystemTime());
        this.processorSystemTime.mockProcessor.checkAndClearPunctuateResult(PunctuationType.WALL_CLOCK_TIME, new long[0]);
    }

    @Test
    public void shouldPunctuateOnceSystemTimeAfterGap() {
        this.task = createStatelessTask(createConfig(false), "latest");
        this.task.initializeStateStores();
        this.task.initializeTopology();
        long milliseconds = this.time.milliseconds();
        this.time.sleep(100L);
        Assert.assertTrue(this.task.maybePunctuateSystemTime());
        Assert.assertFalse(this.task.maybePunctuateSystemTime());
        this.time.sleep(10L);
        Assert.assertTrue(this.task.maybePunctuateSystemTime());
        this.time.sleep(12L);
        Assert.assertTrue(this.task.maybePunctuateSystemTime());
        this.time.sleep(7L);
        Assert.assertFalse(this.task.maybePunctuateSystemTime());
        this.time.sleep(1L);
        Assert.assertTrue(this.task.maybePunctuateSystemTime());
        this.time.sleep(105L);
        Assert.assertTrue(this.task.maybePunctuateSystemTime());
        Assert.assertFalse(this.task.maybePunctuateSystemTime());
        this.time.sleep(5L);
        Assert.assertTrue(this.task.maybePunctuateSystemTime());
        Assert.assertFalse(this.task.maybePunctuateSystemTime());
        this.processorSystemTime.mockProcessor.checkAndClearPunctuateResult(PunctuationType.WALL_CLOCK_TIME, milliseconds + 100, milliseconds + 110, milliseconds + 122, milliseconds + 130, milliseconds + 235, milliseconds + 240);
    }

    @Test
    public void shouldWrapKafkaExceptionsWithStreamsExceptionAndAddContext() {
        this.task = createTaskThatThrowsException(false);
        this.task.initializeStateStores();
        this.task.initializeTopology();
        this.task.addRecords(this.partition2, Collections.singletonList(getConsumerRecord(this.partition2, 0L)));
        try {
            this.task.process();
            Assert.fail("Should've thrown StreamsException");
        } catch (Exception e) {
            MatcherAssert.assertThat(this.task.processorContext.currentNode(), CoreMatchers.nullValue());
        }
    }

    @Test
    public void shouldWrapKafkaExceptionsWithStreamsExceptionAndAddContextWhenPunctuatingStreamTime() {
        this.task = createStatelessTask(createConfig(false), "latest");
        this.task.initializeStateStores();
        this.task.initializeTopology();
        try {
            this.task.punctuate(this.processorStreamTime, 1L, PunctuationType.STREAM_TIME, j -> {
                throw new KafkaException("KABOOM!");
            });
            Assert.fail("Should've thrown StreamsException");
        } catch (StreamsException e) {
            String message = e.getMessage();
            Assert.assertTrue("message=" + message + " should contain processor", message.contains("processor '" + this.processorStreamTime.name() + "'"));
            MatcherAssert.assertThat(this.task.processorContext.currentNode(), CoreMatchers.nullValue());
        }
    }

    @Test
    public void shouldWrapKafkaExceptionsWithStreamsExceptionAndAddContextWhenPunctuatingWallClockTimeTime() {
        this.task = createStatelessTask(createConfig(false), "latest");
        this.task.initializeStateStores();
        this.task.initializeTopology();
        try {
            this.task.punctuate(this.processorSystemTime, 1L, PunctuationType.WALL_CLOCK_TIME, j -> {
                throw new KafkaException("KABOOM!");
            });
            Assert.fail("Should've thrown StreamsException");
        } catch (StreamsException e) {
            String message = e.getMessage();
            Assert.assertTrue("message=" + message + " should contain processor", message.contains("processor '" + this.processorSystemTime.name() + "'"));
            MatcherAssert.assertThat(this.task.processorContext.currentNode(), CoreMatchers.nullValue());
        }
    }

    @Test
    public void shouldFlushRecordCollectorOnFlushState() {
        MockStreamsMetrics mockStreamsMetrics = new MockStreamsMetrics(new Metrics());
        MockRecordCollector mockRecordCollector = new MockRecordCollector();
        new StreamTask(this.taskId00, this.partitions, this.topology, this.consumer, this.changelogReader, createConfig(false), mockStreamsMetrics, this.stateDirectory, (ThreadCache) null, this.time, () -> {
            MockProducer<byte[], byte[]> mockProducer = new MockProducer<>(false, this.bytesSerializer, this.bytesSerializer);
            this.producer = mockProducer;
            return mockProducer;
        }, mockRecordCollector).flushState();
        Assert.assertTrue(mockRecordCollector.flushed());
    }

    @Test
    public void shouldCheckpointOffsetsOnCommit() throws IOException {
        this.task = createStatefulTask(createConfig(false), true);
        this.task.initializeStateStores();
        this.task.initializeTopology();
        this.task.commit();
        MatcherAssert.assertThat(new OffsetCheckpoint(new File(this.stateDirectory.directoryForTask(this.taskId00), ".checkpoint")).read(), CoreMatchers.equalTo(Collections.singletonMap(this.changelogPartition, this.offset)));
    }

    @Test
    public void shouldNotCheckpointOffsetsOnCommitIfEosIsEnabled() {
        this.task = createStatefulTask(createConfig(true), true);
        this.task.initializeStateStores();
        this.task.initializeTopology();
        this.task.commit();
        Assert.assertFalse(new File(this.stateDirectory.directoryForTask(this.taskId00), ".checkpoint").exists());
    }

    @Test
    public void shouldThrowIllegalStateExceptionIfCurrentNodeIsNotNullWhenPunctuateCalled() {
        this.task = createStatelessTask(createConfig(false), "latest");
        this.task.initializeStateStores();
        this.task.initializeTopology();
        this.task.processorContext.setCurrentNode(this.processorStreamTime);
        try {
            this.task.punctuate(this.processorStreamTime, 10L, PunctuationType.STREAM_TIME, this.punctuator);
            Assert.fail("Should throw illegal state exception as current node is not null");
        } catch (IllegalStateException e) {
        }
    }

    @Test
    public void shouldCallPunctuateOnPassedInProcessorNode() {
        this.task = createStatelessTask(createConfig(false), "latest");
        this.task.initializeStateStores();
        this.task.initializeTopology();
        this.task.punctuate(this.processorStreamTime, 5L, PunctuationType.STREAM_TIME, this.punctuator);
        MatcherAssert.assertThat(Long.valueOf(this.punctuatedAt), CoreMatchers.equalTo(5L));
        this.task.punctuate(this.processorStreamTime, 10L, PunctuationType.STREAM_TIME, this.punctuator);
        MatcherAssert.assertThat(Long.valueOf(this.punctuatedAt), CoreMatchers.equalTo(10L));
    }

    @Test
    public void shouldSetProcessorNodeOnContextBackToNullAfterSuccessfulPunctuate() {
        this.task = createStatelessTask(createConfig(false), "latest");
        this.task.initializeStateStores();
        this.task.initializeTopology();
        this.task.punctuate(this.processorStreamTime, 5L, PunctuationType.STREAM_TIME, this.punctuator);
        MatcherAssert.assertThat(this.task.context().currentNode(), CoreMatchers.nullValue());
    }

    @Test(expected = IllegalStateException.class)
    public void shouldThrowIllegalStateExceptionOnScheduleIfCurrentNodeIsNull() {
        this.task = createStatelessTask(createConfig(false), "latest");
        this.task.schedule(1L, PunctuationType.STREAM_TIME, j -> {
        });
    }

    @Test
    public void shouldNotThrowExceptionOnScheduleIfCurrentNodeIsNotNull() {
        this.task = createStatelessTask(createConfig(false), "latest");
        this.task.processorContext.setCurrentNode(this.processorStreamTime);
        this.task.schedule(1L, PunctuationType.STREAM_TIME, j -> {
        });
    }

    @Test
    public void shouldNotCloseProducerOnCleanCloseWithEosDisabled() {
        this.task = createStatelessTask(createConfig(false), "latest");
        this.task.close(true, false);
        this.task = null;
        Assert.assertFalse(this.producer.closed());
    }

    @Test
    public void shouldNotCloseProducerOnUncleanCloseWithEosDisabled() {
        this.task = createStatelessTask(createConfig(false), "latest");
        this.task.close(false, false);
        this.task = null;
        Assert.assertFalse(this.producer.closed());
    }

    @Test
    public void shouldNotCloseProducerOnErrorDuringCleanCloseWithEosDisabled() {
        this.task = createTaskThatThrowsException(false);
        try {
            this.task.close(true, false);
            Assert.fail("should have thrown runtime exception");
        } catch (RuntimeException e) {
            this.task = null;
        }
        Assert.assertFalse(this.producer.closed());
    }

    @Test
    public void shouldNotCloseProducerOnErrorDuringUncleanCloseWithEosDisabled() {
        this.task = createTaskThatThrowsException(false);
        this.task.close(false, false);
        this.task = null;
        Assert.assertFalse(this.producer.closed());
    }

    @Test
    public void shouldCommitTransactionAndCloseProducerOnCleanCloseWithEosEnabled() {
        this.task = createStatelessTask(createConfig(true), "latest");
        this.task.initializeTopology();
        this.task.close(true, false);
        this.task = null;
        Assert.assertTrue(this.producer.transactionCommitted());
        Assert.assertFalse(this.producer.transactionInFlight());
        Assert.assertTrue(this.producer.closed());
    }

    @Test
    public void shouldNotAbortTransactionAndNotCloseProducerOnErrorDuringCleanCloseWithEosEnabled() {
        this.task = createTaskThatThrowsException(true);
        this.task.initializeTopology();
        try {
            this.task.close(true, false);
            Assert.fail("should have thrown runtime exception");
        } catch (RuntimeException e) {
            this.task = null;
        }
        Assert.assertTrue(this.producer.transactionInFlight());
        Assert.assertFalse(this.producer.closed());
    }

    @Test
    public void shouldOnlyCloseProducerIfFencedOnCommitDuringCleanCloseWithEosEnabled() {
        this.task = createStatelessTask(createConfig(true), "latest");
        this.task.initializeTopology();
        this.producer.fenceProducer();
        try {
            this.task.close(true, false);
            Assert.fail("should have thrown TaskMigratedException");
        } catch (TaskMigratedException e) {
            this.task = null;
            Assert.assertTrue(e.getCause() instanceof RecoverableClientException);
        }
        Assert.assertFalse(this.producer.transactionCommitted());
        Assert.assertTrue(this.producer.transactionInFlight());
        Assert.assertFalse(this.producer.transactionAborted());
        Assert.assertFalse(this.producer.transactionCommitted());
        Assert.assertTrue(this.producer.closed());
    }

    @Test
    public void shouldNotCloseProducerIfFencedOnCloseDuringCleanCloseWithEosEnabled() {
        this.task = createStatelessTask(createConfig(true), "latest");
        this.task.initializeTopology();
        this.producer.fenceProducerOnClose();
        try {
            this.task.close(true, false);
            Assert.fail("should have thrown TaskMigratedException");
        } catch (TaskMigratedException e) {
            this.task = null;
            Assert.assertTrue(e.getCause() instanceof RecoverableClientException);
        }
        Assert.assertTrue(this.producer.transactionCommitted());
        Assert.assertFalse(this.producer.transactionInFlight());
        Assert.assertFalse(this.producer.closed());
    }

    @Test
    public void shouldCloseProducerOnUncleanCloseWithEosEnabled() {
        this.task = createStatelessTask(createConfig(true), "latest");
        this.task.initializeTopology();
        this.task.close(false, false);
        this.task = null;
        Assert.assertTrue(this.producer.transactionInFlight());
        Assert.assertTrue(this.producer.closed());
    }

    @Test
    public void shouldCloseProducerOnErrorDuringUncleanCloseWithEosEnabled() {
        this.task = createTaskThatThrowsException(true);
        this.task.initializeTopology();
        this.task.close(false, false);
        Assert.assertTrue(this.producer.closed());
    }

    @Test
    public void shouldOnlyCloseProducerIfFencedOnAbortDuringUncleanCloseWithEosEnabled() {
        this.task = createStatelessTask(createConfig(true), "latest");
        this.task.initializeTopology();
        this.producer.fenceProducer();
        this.task.close(false, false);
        this.task = null;
        Assert.assertTrue(this.producer.transactionInFlight());
        Assert.assertFalse(this.producer.transactionAborted());
        Assert.assertFalse(this.producer.transactionCommitted());
        Assert.assertTrue(this.producer.closed());
    }

    @Test
    public void shouldMigrateTaskIfFencedDuringFlush() {
        MockKeyValueStore mockKeyValueStore = new MockKeyValueStore("store", true, true);
        this.task = new StreamTask(this.taskId00, this.partitions, new ProcessorTopology(Arrays.asList(this.source1, this.source2), Utils.mkMap(new Map.Entry[]{Utils.mkEntry("topic1", this.source1), Utils.mkEntry("topic2", this.source2)}), Collections.emptyMap(), Collections.singletonList(mockKeyValueStore), Collections.emptyList(), Collections.singletonMap("store", "store-changelog"), Collections.emptySet()), this.consumer, this.changelogReader, createConfig(true), this.streamsMetrics, this.stateDirectory, (ThreadCache) null, this.time, () -> {
            MockProducer<byte[], byte[]> mockProducer = new MockProducer<>(false, this.bytesSerializer, this.bytesSerializer);
            this.producer = mockProducer;
            return mockProducer;
        });
        this.task.initializeStateStores();
        this.task.initializeTopology();
        this.producer.fenceProducer();
        try {
            this.task.flushState();
            Assert.fail("Expected a TaskMigratedException");
        } catch (TaskMigratedException e) {
            MatcherAssert.assertThat(e.migratedTask(), CoreMatchers.is(this.task));
        }
    }

    @Test
    public void shouldMigrateTaskIfFencedDuringProcess() {
        MockKeyValueStore mockKeyValueStore = new MockKeyValueStore("store", true, true);
        Map singletonMap = Collections.singletonMap("store", "store-changelog");
        ProcessorNode sourceNode = new SourceNode("test", Collections.singletonList("topic1"), new WallclockTimestampExtractor(), this.intDeserializer, this.intDeserializer);
        ProcessorNode sinkNode = new SinkNode("test-sink", new StaticTopicNameExtractor("out-topic"), this.intSerializer, this.intSerializer, new StreamPartitioner<Integer, Integer>() { // from class: org.apache.kafka.streams.processor.internals.StreamTaskTest.6
            public Integer partition(String str, Integer num, Integer num2, int i) {
                return 1;
            }
        });
        sourceNode.addChild(sinkNode);
        this.task = new StreamTask(this.taskId00, this.partitions, new ProcessorTopology(Arrays.asList(sourceNode, sinkNode), Utils.mkMap(new Map.Entry[]{Utils.mkEntry("topic1", sourceNode), Utils.mkEntry("topic2", sourceNode)}), Utils.mkMap(new Map.Entry[]{Utils.mkEntry("out-topic", sinkNode)}), Collections.singletonList(mockKeyValueStore), Collections.emptyList(), singletonMap, Collections.emptySet()), this.consumer, this.changelogReader, createConfig(true), this.streamsMetrics, this.stateDirectory, (ThreadCache) null, this.time, () -> {
            MockProducer<byte[], byte[]> mockProducer = new MockProducer<byte[], byte[]>(Cluster.empty(), false, new DefaultPartitioner(), this.bytesSerializer, this.bytesSerializer) { // from class: org.apache.kafka.streams.processor.internals.StreamTaskTest.7
                public List<PartitionInfo> partitionsFor(String str) {
                    return Collections.singletonList(null);
                }
            };
            this.producer = mockProducer;
            return mockProducer;
        });
        this.task.initializeStateStores();
        this.task.initializeTopology();
        this.producer.fenceProducer();
        try {
            this.task.addRecords(this.partition1, Collections.singletonList(getConsumerRecord(this.partition1, 0L)));
            this.task.process();
            Assert.fail("Expected a TaskMigratedException");
        } catch (TaskMigratedException e) {
            MatcherAssert.assertThat(e.migratedTask(), CoreMatchers.is(this.task));
        }
    }

    @Test
    public void shouldMigrateTaskIfFencedDuringPunctuate() {
        this.task = createStatelessTask(createConfig(true), "latest");
        RecordCollectorImpl recordCollectorImpl = new RecordCollectorImpl("StreamTask", new LogContext("StreamTaskTest "), new DefaultProductionExceptionHandler(), new Metrics().sensor("skipped-records"));
        recordCollectorImpl.init(this.producer);
        this.task.initializeStateStores();
        this.task.initializeTopology();
        this.producer.fenceProducer();
        try {
            this.task.punctuate(this.processorSystemTime, 5L, PunctuationType.WALL_CLOCK_TIME, j -> {
                recordCollectorImpl.send("result-topic1", 3, 5, (Headers) null, 0, Long.valueOf(this.time.milliseconds()), new IntegerSerializer(), new IntegerSerializer());
            });
            Assert.fail("Expected a TaskMigratedException");
        } catch (TaskMigratedException e) {
            MatcherAssert.assertThat(e.migratedTask(), CoreMatchers.is(this.task));
        }
    }

    @Test
    public void shouldMigrateTaskIfFencedDuringCommit() {
        ProcessorTopology withSources = withSources(Arrays.asList(this.source1, this.source2, this.processorStreamTime, this.processorSystemTime), Utils.mkMap(new Map.Entry[]{Utils.mkEntry("topic1", this.source1), Utils.mkEntry("topic2", this.source2)}));
        this.source1.addChild(this.processorStreamTime);
        this.source2.addChild(this.processorStreamTime);
        this.source1.addChild(this.processorSystemTime);
        this.source2.addChild(this.processorSystemTime);
        this.task = new StreamTask(this.taskId00, this.partitions, withSources, this.consumer, this.changelogReader, createConfig(true), this.streamsMetrics, this.stateDirectory, null, this.time, () -> {
            MockProducer<byte[], byte[]> mockProducer = new MockProducer<>(false, this.bytesSerializer, this.bytesSerializer);
            this.producer = mockProducer;
            return mockProducer;
        }) { // from class: org.apache.kafka.streams.processor.internals.StreamTaskTest.8
            protected void flushState() {
            }
        };
        new RecordCollectorImpl("StreamTask", new LogContext("StreamTaskTest "), new DefaultProductionExceptionHandler(), new Metrics().sensor("skipped-records")).init(this.producer);
        this.task.initializeStateStores();
        this.task.initializeTopology();
        this.producer.fenceProducer();
        try {
            this.task.commit();
            Assert.fail("Expected a TaskMigratedException");
        } catch (TaskMigratedException e) {
            MatcherAssert.assertThat(e.migratedTask(), CoreMatchers.is(this.task));
        }
    }

    @Test
    public void shouldOnlyCloseFencedProducerOnUncleanClosedWithEosEnabled() {
        this.task = createStatelessTask(createConfig(true), "latest");
        this.task.initializeTopology();
        this.producer.fenceProducer();
        this.task.close(false, true);
        this.task = null;
        Assert.assertFalse(this.producer.transactionAborted());
        Assert.assertTrue(this.producer.closed());
    }

    @Test
    public void shouldNotCloseProducerIfFencedOnCloseDuringUncleanCloseWithEosEnabled() {
        this.task = createStatelessTask(createConfig(true), "latest");
        this.task.initializeTopology();
        this.producer.fenceProducerOnClose();
        this.task.close(false, false);
        this.task = null;
        Assert.assertFalse(this.producer.closed());
    }

    @Test
    public void shouldThrowExceptionIfAnyExceptionsRaisedDuringCloseButStillCloseAllProcessorNodesTopology() {
        this.task = createTaskThatThrowsException(false);
        this.task.initializeStateStores();
        this.task.initializeTopology();
        try {
            this.task.close(true, false);
            Assert.fail("should have thrown runtime exception");
        } catch (RuntimeException e) {
            this.task = null;
        }
        Assert.assertTrue(this.processorSystemTime.closed);
        Assert.assertTrue(this.processorStreamTime.closed);
        Assert.assertTrue(this.source1.closed);
    }

    @Test
    public void shouldInitAndBeginTransactionOnCreateIfEosEnabled() {
        this.task = createStatelessTask(createConfig(true), "latest");
        this.task.initializeTopology();
        Assert.assertTrue(this.producer.transactionInitialized());
        Assert.assertTrue(this.producer.transactionInFlight());
    }

    @Test
    public void shouldWrapProducerFencedExceptionWithTaskMigratedExceptionForBeginTransaction() {
        this.task = createStatelessTask(createConfig(true), "latest");
        this.producer.fenceProducer();
        try {
            this.task.initializeTopology();
            Assert.fail("Should have throws TaskMigratedException");
        } catch (TaskMigratedException e) {
            Assert.assertTrue(e.getCause() instanceof ProducerFencedException);
        }
    }

    @Test
    public void shouldNotThrowOnCloseIfTaskWasNotInitializedWithEosEnabled() {
        this.task = createStatelessTask(createConfig(true), "latest");
        Assert.assertFalse(this.producer.transactionInFlight());
        this.task.close(false, false);
    }

    @Test
    public void shouldNotInitOrBeginTransactionOnCreateIfEosDisabled() {
        this.task = createStatelessTask(createConfig(false), "latest");
        Assert.assertFalse(this.producer.transactionInitialized());
        Assert.assertFalse(this.producer.transactionInFlight());
    }

    @Test
    public void shouldSendOffsetsAndCommitTransactionButNotStartNewTransactionOnSuspendIfEosEnabled() {
        this.task = createStatelessTask(createConfig(true), "latest");
        this.task.initializeTopology();
        this.task.addRecords(this.partition1, Collections.singletonList(getConsumerRecord(this.partition1, 0L)));
        this.task.process();
        this.task.suspend();
        Assert.assertTrue(this.producer.sentOffsets());
        Assert.assertTrue(this.producer.transactionCommitted());
        Assert.assertFalse(this.producer.transactionInFlight());
    }

    @Test
    public void shouldCommitTransactionOnSuspendEvenIfTransactionIsEmptyIfEosEnabled() {
        this.task = createStatelessTask(createConfig(true), "latest");
        this.task.initializeTopology();
        this.task.suspend();
        Assert.assertTrue(this.producer.transactionCommitted());
        Assert.assertFalse(this.producer.transactionInFlight());
    }

    @Test
    public void shouldNotSendOffsetsAndCommitTransactionNorStartNewTransactionOnSuspendIfEosDisabled() {
        this.task = createStatelessTask(createConfig(false), "latest");
        this.task.addRecords(this.partition1, Collections.singletonList(getConsumerRecord(this.partition1, 0L)));
        this.task.process();
        this.task.suspend();
        Assert.assertFalse(this.producer.sentOffsets());
        Assert.assertFalse(this.producer.transactionCommitted());
        Assert.assertFalse(this.producer.transactionInFlight());
    }

    @Test
    public void shouldWrapProducerFencedExceptionWithTaskMigratedExceptionInSuspendWhenCommitting() {
        this.task = createStatelessTask(createConfig(true), "latest");
        this.producer.fenceProducer();
        try {
            this.task.suspend();
            Assert.fail("Should have throws TaskMigratedException");
        } catch (TaskMigratedException e) {
            Assert.assertTrue(e.getCause() instanceof RecoverableClientException);
        }
        this.task = null;
        Assert.assertFalse(this.producer.transactionCommitted());
    }

    @Test
    public void shouldWrapProducerFencedExceptionWithTaskMigragedExceptionInSuspendWhenClosingProducer() {
        this.task = createStatelessTask(createConfig(true), "latest");
        this.task.initializeTopology();
        this.producer.fenceProducerOnClose();
        try {
            this.task.suspend();
            Assert.fail("Should have throws TaskMigratedException");
        } catch (TaskMigratedException e) {
            Assert.assertTrue(e.getCause() instanceof RecoverableClientException);
        }
        Assert.assertTrue(this.producer.transactionCommitted());
    }

    @Test
    public void shouldInitTaskTimeOnResumeWithEosDisabled() {
        shouldInitTaskTimeOnResume(false);
    }

    @Test
    public void shouldInitTaskTimeOnResumeWithEosEnabled() {
        shouldInitTaskTimeOnResume(true);
    }

    private void shouldInitTaskTimeOnResume(boolean z) {
        this.task = createStatelessTask(createConfig(z), "latest");
        this.task.initializeTopology();
        MatcherAssert.assertThat(Long.valueOf(this.task.partitionTime(this.partition1)), CoreMatchers.is(-1L));
        MatcherAssert.assertThat(Long.valueOf(this.task.streamTime()), CoreMatchers.is(-1L));
        this.task.addRecords(this.partition1, Collections.singletonList(getConsumerRecord(this.partition1, 0L)));
        this.task.process();
        MatcherAssert.assertThat(Long.valueOf(this.task.partitionTime(this.partition1)), CoreMatchers.is(0L));
        MatcherAssert.assertThat(Long.valueOf(this.task.streamTime()), CoreMatchers.is(0L));
        this.task.suspend();
        MatcherAssert.assertThat(Long.valueOf(this.task.partitionTime(this.partition1)), CoreMatchers.is(-1L));
        MatcherAssert.assertThat(Long.valueOf(this.task.streamTime()), CoreMatchers.is(-1L));
        if (z) {
            moveCommittedOffsetsFromProducerToConsumer(0L);
        }
        this.task.resume();
        MatcherAssert.assertThat(Long.valueOf(this.task.partitionTime(this.partition1)), CoreMatchers.is(0L));
        MatcherAssert.assertThat(Long.valueOf(this.task.streamTime()), CoreMatchers.is(0L));
    }

    @Test
    public void shouldStartNewTransactionOnResumeIfEosEnabled() {
        this.task = createStatelessTask(createConfig(true), "latest");
        this.task.initializeTopology();
        this.task.addRecords(this.partition1, Collections.singletonList(getConsumerRecord(this.partition1, 0L)));
        this.task.process();
        this.task.suspend();
        this.task.resume();
        this.task.initializeTopology();
        Assert.assertTrue(this.producer.transactionInFlight());
    }

    @Test
    public void shouldNotStartNewTransactionOnResumeIfEosDisabled() {
        this.task = createStatelessTask(createConfig(false), "latest");
        this.task.addRecords(this.partition1, Collections.singletonList(getConsumerRecord(this.partition1, 0L)));
        this.task.process();
        this.task.suspend();
        this.task.resume();
        Assert.assertFalse(this.producer.transactionInFlight());
    }

    @Test
    public void shouldStartNewTransactionOnCommitIfEosEnabled() {
        this.task = createStatelessTask(createConfig(true), "latest");
        this.task.initializeTopology();
        this.task.addRecords(this.partition1, Collections.singletonList(getConsumerRecord(this.partition1, 0L)));
        this.task.process();
        this.task.commit();
        Assert.assertTrue(this.producer.transactionInFlight());
    }

    @Test
    public void shouldNotStartNewTransactionOnCommitIfEosDisabled() {
        this.task = createStatelessTask(createConfig(false), "latest");
        this.task.addRecords(this.partition1, Collections.singletonList(getConsumerRecord(this.partition1, 0L)));
        this.task.process();
        this.task.commit();
        Assert.assertFalse(this.producer.transactionInFlight());
    }

    @Test
    public void shouldNotAbortTransactionOnZombieClosedIfEosEnabled() {
        this.task = createStatelessTask(createConfig(true), "latest");
        this.task.close(false, true);
        this.task = null;
        Assert.assertFalse(this.producer.transactionAborted());
    }

    @Test
    public void shouldNotAbortTransactionOnDirtyClosedIfEosDisabled() {
        this.task = createStatelessTask(createConfig(false), "latest");
        this.task.close(false, false);
        this.task = null;
        Assert.assertFalse(this.producer.transactionAborted());
    }

    @Test
    public void shouldCloseProducerOnCloseWhenEosEnabled() {
        this.task = createStatelessTask(createConfig(true), "latest");
        this.task.initializeTopology();
        this.task.close(true, false);
        this.task = null;
        Assert.assertTrue(this.producer.closed());
    }

    @Test
    public void shouldCloseProducerOnUncleanCloseNotZombieWhenEosEnabled() {
        this.task = createStatelessTask(createConfig(true), "latest");
        this.task.initializeTopology();
        this.task.close(false, false);
        this.task = null;
        Assert.assertTrue(this.producer.closed());
    }

    @Test
    public void shouldCloseProducerOnUncleanCloseIsZombieWhenEosEnabled() {
        this.task = createStatelessTask(createConfig(true), "latest");
        this.task.initializeTopology();
        this.task.close(false, true);
        this.task = null;
        Assert.assertTrue(this.producer.closed());
    }

    @Test
    public void shouldNotViolateAtLeastOnceWhenExceptionOccursDuringFlushing() {
        this.task = createTaskThatThrowsException(false);
        this.task.initializeStateStores();
        this.task.initializeTopology();
        try {
            this.task.commit();
            Assert.fail("should have thrown an exception");
        } catch (Exception e) {
        }
    }

    @Test
    public void shouldNotViolateAtLeastOnceWhenExceptionOccursDuringTaskSuspension() {
        StreamTask createTaskThatThrowsException = createTaskThatThrowsException(false);
        createTaskThatThrowsException.initializeStateStores();
        createTaskThatThrowsException.initializeTopology();
        try {
            createTaskThatThrowsException.suspend();
            Assert.fail("should have thrown an exception");
        } catch (Exception e) {
        }
    }

    @Test
    public void shouldCloseStateManagerIfFailureOnTaskClose() {
        this.task = createStatefulTaskThatThrowsExceptionOnClose();
        this.task.initializeStateStores();
        this.task.initializeTopology();
        try {
            this.task.close(true, false);
            Assert.fail("should have thrown an exception");
        } catch (Exception e) {
        }
        this.task = null;
        Assert.assertFalse(this.stateStore.isOpen());
    }

    @Test
    public void shouldNotCloseTopologyProcessorNodesIfNotInitialized() {
        try {
            createTaskThatThrowsException(false).close(false, false);
        } catch (Exception e) {
            Assert.fail("should have not closed non-initialized topology");
        }
    }

    @Test
    public void shouldBeInitializedIfChangelogPartitionsIsEmpty() {
        Assert.assertTrue(createStatefulTask(createConfig(false), false).initializeStateStores());
    }

    @Test
    public void shouldNotBeInitializedIfChangelogPartitionsIsNonEmpty() {
        Assert.assertFalse(createStatefulTask(createConfig(false), true).initializeStateStores());
    }

    @Test
    public void shouldReturnOffsetsForRepartitionTopicsForPurging() {
        TopicPartition topicPartition = new TopicPartition("repartition", 1);
        ProcessorTopology withRepartitionTopics = withRepartitionTopics(Arrays.asList(this.source1, this.source2), Utils.mkMap(new Map.Entry[]{Utils.mkEntry("topic1", this.source1), Utils.mkEntry(topicPartition.topic(), this.source2)}), Collections.singleton(topicPartition.topic()));
        this.consumer.assign(Arrays.asList(this.partition1, topicPartition));
        this.consumer.updateBeginningOffsets(Utils.mkMap(new Map.Entry[]{Utils.mkEntry(topicPartition, 0L)}));
        this.task = new StreamTask(this.taskId00, Utils.mkSet(new TopicPartition[]{this.partition1, topicPartition}), withRepartitionTopics, this.consumer, this.changelogReader, createConfig(false), this.streamsMetrics, this.stateDirectory, (ThreadCache) null, this.time, () -> {
            MockProducer<byte[], byte[]> mockProducer = new MockProducer<>(false, this.bytesSerializer, this.bytesSerializer);
            this.producer = mockProducer;
            return mockProducer;
        });
        this.task.initializeStateStores();
        this.task.initializeTopology();
        this.task.addRecords(this.partition1, Collections.singletonList(getConsumerRecord(this.partition1, 5L)));
        this.task.addRecords(topicPartition, Collections.singletonList(getConsumerRecord(topicPartition, 10L)));
        Assert.assertTrue(this.task.process());
        Assert.assertTrue(this.task.process());
        this.task.commit();
        MatcherAssert.assertThat(this.task.purgableOffsets(), CoreMatchers.equalTo(Collections.singletonMap(topicPartition, 11L)));
    }

    @Test
    public void shouldThrowOnCleanCloseTaskWhenEosEnabledIfTransactionInFlight() {
        this.task = createStatelessTask(createConfig(true), "latest");
        try {
            this.task.close(true, false);
            Assert.fail("should have throw IllegalStateException");
        } catch (IllegalStateException e) {
        }
        this.task = null;
        Assert.assertTrue(this.producer.closed());
    }

    @Test
    public void shouldAlwaysCommitIfEosEnabled() {
        this.task = createStatelessTask(createConfig(true), "latest");
        RecordCollectorImpl recordCollectorImpl = new RecordCollectorImpl("StreamTask", new LogContext("StreamTaskTest "), new DefaultProductionExceptionHandler(), new Metrics().sensor("dropped-records"));
        recordCollectorImpl.init(this.producer);
        this.task.initializeStateStores();
        this.task.initializeTopology();
        this.task.punctuate(this.processorSystemTime, 5L, PunctuationType.WALL_CLOCK_TIME, j -> {
            recordCollectorImpl.send("result-topic1", 3, 5, (Headers) null, 0, Long.valueOf(this.time.milliseconds()), new IntegerSerializer(), new IntegerSerializer());
        });
        this.task.commit();
        Assert.assertEquals(1L, this.producer.history().size());
    }

    @Test(expected = ProcessorStateException.class)
    public void shouldThrowProcessorStateExceptionOnInitializeOffsetsWhenAuthorizationException() {
        StreamTask createOptimizedStatefulTask = createOptimizedStatefulTask(createConfig(false), mockConsumerWithCommittedException(new AuthorizationException("message")));
        createOptimizedStatefulTask.initializeMetadata();
        createOptimizedStatefulTask.initializeStateStores();
    }

    @Test(expected = ProcessorStateException.class)
    public void shouldThrowProcessorStateExceptionOnInitializeOffsetsWhenKafkaException() {
        StreamTask createOptimizedStatefulTask = createOptimizedStatefulTask(createConfig(false), mockConsumerWithCommittedException(new KafkaException("message")));
        createOptimizedStatefulTask.initializeMetadata();
        createOptimizedStatefulTask.initializeStateStores();
    }

    @Test(expected = WakeupException.class)
    public void shouldThrowWakeupExceptionOnInitializeOffsetsWhenWakeupException() {
        StreamTask createOptimizedStatefulTask = createOptimizedStatefulTask(createConfig(false), mockConsumerWithCommittedException(new WakeupException()));
        createOptimizedStatefulTask.initializeMetadata();
        createOptimizedStatefulTask.initializeStateStores();
    }

    private Consumer<byte[], byte[]> mockConsumerWithCommittedException(final RuntimeException runtimeException) {
        return new MockConsumer<byte[], byte[]>(OffsetResetStrategy.EARLIEST) { // from class: org.apache.kafka.streams.processor.internals.StreamTaskTest.9
            public Map<TopicPartition, OffsetAndMetadata> committed(Set<TopicPartition> set) {
                throw runtimeException;
            }
        };
    }

    private void moveCommittedOffsetsFromProducerToConsumer(long j) {
        long decodeTimestamp = this.task.decodeTimestamp(((OffsetAndMetadata) ((Map) ((Map) this.producer.consumerGroupOffsetsHistory().get(0)).get(APPLICATION_ID)).get(this.partition1)).metadata());
        MatcherAssert.assertThat(Long.valueOf(decodeTimestamp), CoreMatchers.is(Long.valueOf(j)));
        HashMap hashMap = new HashMap();
        hashMap.put(this.partition1, new OffsetAndMetadata(decodeTimestamp, this.task.encodeTimestamp(decodeTimestamp)));
        this.consumer.commitSync(hashMap);
    }

    private StreamTask createOptimizedStatefulTask(StreamsConfig streamsConfig, Consumer<byte[], byte[]> consumer) {
        return new StreamTask(this.taskId00, Utils.mkSet(new TopicPartition[]{this.partition1}), ProcessorTopologyFactories.with(Collections.singletonList(this.source1), Utils.mkMap(new Map.Entry[]{Utils.mkEntry("topic1", this.source1)}), Collections.singletonList(new MockKeyValueStore("store", true)), Collections.singletonMap("store", "topic1")), consumer, this.changelogReader, streamsConfig, this.streamsMetrics, this.stateDirectory, (ThreadCache) null, this.time, () -> {
            MockProducer<byte[], byte[]> mockProducer = new MockProducer<>(false, this.bytesSerializer, this.bytesSerializer);
            this.producer = mockProducer;
            return mockProducer;
        });
    }

    private StreamTask createStatefulTask(StreamsConfig streamsConfig, boolean z) {
        return new StreamTask(this.taskId00, this.partitions, ProcessorTopologyFactories.with(Arrays.asList(this.source1, this.source2), Utils.mkMap(new Map.Entry[]{Utils.mkEntry("topic1", this.source1), Utils.mkEntry("topic2", this.source2)}), Collections.singletonList(new MockKeyValueStore("store", z)), z ? Collections.singletonMap("store", "store-changelog") : Collections.emptyMap()), this.consumer, this.changelogReader, streamsConfig, this.streamsMetrics, this.stateDirectory, (ThreadCache) null, this.time, () -> {
            MockProducer<byte[], byte[]> mockProducer = new MockProducer<>(false, this.bytesSerializer, this.bytesSerializer);
            this.producer = mockProducer;
            return mockProducer;
        });
    }

    private StreamTask createStatefulTaskThatThrowsExceptionOnClose() {
        return new StreamTask(this.taskId00, this.partitions, ProcessorTopologyFactories.with(Arrays.asList(this.source1, this.source3), Utils.mkMap(new Map.Entry[]{Utils.mkEntry("topic1", this.source1), Utils.mkEntry("topic2", this.source3)}), Collections.singletonList(this.stateStore), Collections.emptyMap()), this.consumer, this.changelogReader, createConfig(true), this.streamsMetrics, this.stateDirectory, (ThreadCache) null, this.time, () -> {
            MockProducer<byte[], byte[]> mockProducer = new MockProducer<>(false, this.bytesSerializer, this.bytesSerializer);
            this.producer = mockProducer;
            return mockProducer;
        });
    }

    private StreamTask createStatelessTask(StreamsConfig streamsConfig, String str) {
        ProcessorTopology withSources = withSources(Arrays.asList(this.source1, this.source2, this.processorStreamTime, this.processorSystemTime), Utils.mkMap(new Map.Entry[]{Utils.mkEntry("topic1", this.source1), Utils.mkEntry("topic2", this.source2)}));
        this.source1.addChild(this.processorStreamTime);
        this.source2.addChild(this.processorStreamTime);
        this.source1.addChild(this.processorSystemTime);
        this.source2.addChild(this.processorSystemTime);
        return new StreamTask(this.taskId00, this.partitions, withSources, this.consumer, this.changelogReader, streamsConfig, new StreamsMetricsImpl(this.metrics, "test", str), this.stateDirectory, (ThreadCache) null, this.time, () -> {
            MockProducer<byte[], byte[]> mockProducer = new MockProducer<>(false, this.bytesSerializer, this.bytesSerializer);
            this.producer = mockProducer;
            return mockProducer;
        });
    }

    private StreamTask createTaskThatThrowsException(boolean z) {
        ProcessorTopology withSources = withSources(Arrays.asList(this.source1, this.source3, this.processorStreamTime, this.processorSystemTime), Utils.mkMap(new Map.Entry[]{Utils.mkEntry("topic1", this.source1), Utils.mkEntry("topic2", this.source3)}));
        this.source1.addChild(this.processorStreamTime);
        this.source3.addChild(this.processorStreamTime);
        this.source1.addChild(this.processorSystemTime);
        this.source3.addChild(this.processorSystemTime);
        return new StreamTask(this.taskId00, this.partitions, withSources, this.consumer, this.changelogReader, createConfig(z), this.streamsMetrics, this.stateDirectory, null, this.time, () -> {
            MockProducer<byte[], byte[]> mockProducer = new MockProducer<>(false, this.bytesSerializer, this.bytesSerializer);
            this.producer = mockProducer;
            return mockProducer;
        }) { // from class: org.apache.kafka.streams.processor.internals.StreamTaskTest.10
            protected void flushState() {
                throw new RuntimeException("KABOOM!");
            }
        };
    }

    private ConsumerRecord<byte[], byte[]> getConsumerRecord(TopicPartition topicPartition, long j) {
        return new ConsumerRecord<>(topicPartition.topic(), topicPartition.partition(), j, j, TimestampType.CREATE_TIME, 0L, 0, 0, this.recordKey, this.recordValue);
    }

    /*  JADX ERROR: Failed to decode insn: 0x0002: MOVE_MULTI, method: org.apache.kafka.streams.processor.internals.StreamTaskTest.access$202(org.apache.kafka.streams.processor.internals.StreamTaskTest, long):long
        java.lang.ArrayIndexOutOfBoundsException: arraycopy: source index -1 out of bounds for object array[6]
        	at java.base/java.lang.System.arraycopy(Native Method)
        	at jadx.plugins.input.java.data.code.StackState.insert(StackState.java:49)
        	at jadx.plugins.input.java.data.code.CodeDecodeState.insert(CodeDecodeState.java:118)
        	at jadx.plugins.input.java.data.code.JavaInsnsRegister.dup2x1(JavaInsnsRegister.java:313)
        	at jadx.plugins.input.java.data.code.JavaInsnData.decode(JavaInsnData.java:46)
        	at jadx.core.dex.instructions.InsnDecoder.lambda$process$0(InsnDecoder.java:54)
        	at jadx.plugins.input.java.data.code.JavaCodeReader.visitInstructions(JavaCodeReader.java:81)
        	at jadx.core.dex.instructions.InsnDecoder.process(InsnDecoder.java:50)
        	at jadx.core.dex.nodes.MethodNode.load(MethodNode.java:156)
        	at jadx.core.dex.nodes.ClassNode.load(ClassNode.java:443)
        	at jadx.core.ProcessClass.process(ProcessClass.java:70)
        	at jadx.core.ProcessClass.generateCode(ProcessClass.java:118)
        	at jadx.core.dex.nodes.ClassNode.generateClassCode(ClassNode.java:400)
        	at jadx.core.dex.nodes.ClassNode.decompile(ClassNode.java:388)
        	at jadx.core.dex.nodes.ClassNode.getCode(ClassNode.java:338)
        */
    static /* synthetic */ long access$202(org.apache.kafka.streams.processor.internals.StreamTaskTest r6, long r7) {
        /*
            r0 = r6
            r1 = r7
            // decode failed: arraycopy: source index -1 out of bounds for object array[6]
            r0.punctuatedAt = r1
            return r-1
        */
        throw new UnsupportedOperationException("Method not decompiled: org.apache.kafka.streams.processor.internals.StreamTaskTest.access$202(org.apache.kafka.streams.processor.internals.StreamTaskTest, long):long");
    }

    static {
    }
}
