package org.apache.kafka.connect.runtime;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.MockConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.errors.RetriableException;
import org.apache.kafka.connect.runtime.TaskStatus;
import org.apache.kafka.connect.runtime.WorkerSinkTask;
import org.apache.kafka.connect.runtime.errors.ErrorHandlingMetrics;
import org.apache.kafka.connect.runtime.errors.ErrorReporter;
import org.apache.kafka.connect.runtime.errors.ProcessingContext;
import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator;
import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperatorTest;
import org.apache.kafka.connect.runtime.errors.WorkerErrantRecordReporter;
import org.apache.kafka.connect.runtime.isolation.PluginClassLoader;
import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTask;
import org.apache.kafka.connect.sink.SinkTaskContext;
import org.apache.kafka.connect.storage.ClusterConfigState;
import org.apache.kafka.connect.storage.Converter;
import org.apache.kafka.connect.storage.HeaderConverter;
import org.apache.kafka.connect.storage.StatusBackingStore;
import org.apache.kafka.connect.storage.StringConverter;
import org.apache.kafka.connect.util.ConnectorTaskId;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatchers;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnit;
import org.mockito.junit.MockitoJUnitRunner;
import org.mockito.junit.MockitoRule;
import org.mockito.quality.Strictness;
import org.mockito.stubbing.Answer;
import org.mockito.stubbing.OngoingStubbing;

@RunWith(MockitoJUnitRunner.StrictStubs.class)
/* loaded from: input_file:org/apache/kafka/connect/runtime/WorkerSinkTaskTest.class */
public class WorkerSinkTaskTest {
    private static final int PARTITION = 12;
    private static final long FIRST_OFFSET = 45;
    private static final int KEY = 12;
    private static final String VALUE = "VALUE";
    private static final TaskConfig TASK_CONFIG;
    private MockTime time;
    private WorkerSinkTask workerTask;

    @Mock
    private SinkTask sinkTask;
    private WorkerConfig workerConfig;
    private MockConnectMetrics metrics;

    @Mock
    private PluginClassLoader pluginLoader;

    @Mock
    private Converter keyConverter;

    @Mock
    private Converter valueConverter;

    @Mock
    private HeaderConverter headerConverter;

    @Mock
    private TransformationChain<ConsumerRecord<byte[], byte[]>, SinkRecord> transformationChain;

    @Mock
    private TaskStatus.Listener statusListener;

    @Mock
    private StatusBackingStore statusBackingStore;

    @Mock
    private KafkaConsumer<byte[], byte[]> consumer;

    @Mock
    private ErrorHandlingMetrics errorHandlingMetrics;
    private long recordsReturnedTp1;
    private long recordsReturnedTp3;
    private static final Schema KEY_SCHEMA = Schema.INT32_SCHEMA;
    private static final Schema VALUE_SCHEMA = Schema.STRING_SCHEMA;
    private static final byte[] RAW_KEY = "key".getBytes();
    private static final byte[] RAW_VALUE = "value".getBytes();
    private static final String TOPIC = "test";
    private static final TopicPartition TOPIC_PARTITION = new TopicPartition(TOPIC, 12);
    private static final int PARTITION2 = 13;
    private static final TopicPartition TOPIC_PARTITION2 = new TopicPartition(TOPIC, PARTITION2);
    private static final int PARTITION3 = 14;
    private static final TopicPartition TOPIC_PARTITION3 = new TopicPartition(TOPIC, PARTITION3);
    private static final Set<TopicPartition> INITIAL_ASSIGNMENT = new HashSet(Arrays.asList(TOPIC_PARTITION, TOPIC_PARTITION2));
    private static final Map<String, String> TASK_PROPS = new HashMap();
    private final ConnectorTaskId taskId = new ConnectorTaskId("job", 0);
    private final ConnectorTaskId taskId1 = new ConnectorTaskId("job", 1);
    private final TargetState initialState = TargetState.STARTED;
    private final ArgumentCaptor<WorkerSinkTaskContext> sinkTaskContext = ArgumentCaptor.forClass(WorkerSinkTaskContext.class);
    private final ArgumentCaptor<ConsumerRebalanceListener> rebalanceListener = ArgumentCaptor.forClass(ConsumerRebalanceListener.class);

    @Rule
    public final MockitoRule rule = MockitoJUnit.rule().strictness(Strictness.STRICT_STUBS);

    @Before
    public void setUp() {
        this.time = new MockTime();
        HashMap hashMap = new HashMap();
        hashMap.put("key.converter", "org.apache.kafka.connect.json.JsonConverter");
        hashMap.put("value.converter", "org.apache.kafka.connect.json.JsonConverter");
        hashMap.put("offset.storage.file.filename", "/tmp/connect.offsets");
        this.workerConfig = new StandaloneConfig(hashMap);
        this.metrics = new MockConnectMetrics(this.time);
        this.recordsReturnedTp1 = 0L;
        this.recordsReturnedTp3 = 0L;
    }

    private void createTask(TargetState targetState) {
        createTask(targetState, this.keyConverter, this.valueConverter, this.headerConverter);
    }

    private void createTask(TargetState targetState, Converter converter, Converter converter2, HeaderConverter headerConverter) {
        createTask(targetState, converter, converter2, headerConverter, RetryWithToleranceOperatorTest.noopOperator(), Collections::emptyList);
    }

    private void createTask(TargetState targetState, Converter converter, Converter converter2, HeaderConverter headerConverter, RetryWithToleranceOperator<ConsumerRecord<byte[], byte[]>> retryWithToleranceOperator, Supplier<List<ErrorReporter<ConsumerRecord<byte[], byte[]>>>> supplier) {
        this.workerTask = new WorkerSinkTask(this.taskId, this.sinkTask, this.statusListener, targetState, this.workerConfig, ClusterConfigState.EMPTY, this.metrics, converter, converter2, this.errorHandlingMetrics, headerConverter, this.transformationChain, this.consumer, this.pluginLoader, this.time, retryWithToleranceOperator, (WorkerErrantRecordReporter) null, this.statusBackingStore, supplier);
    }

    @After
    public void tearDown() {
        if (this.metrics != null) {
            this.metrics.stop();
        }
    }

    @Test
    public void testStartPaused() {
        createTask(TargetState.PAUSED);
        expectPollInitialAssignment();
        this.workerTask.initialize(TASK_CONFIG);
        this.workerTask.initializeAndStart();
        verifyInitializeTask();
        this.workerTask.iteration();
        verifyPollInitialAssignment();
        this.time.sleep(10000L);
        ((KafkaConsumer) Mockito.verify(this.consumer)).pause(INITIAL_ASSIGNMENT);
        assertSinkMetricValue("partition-count", 2.0d);
        assertTaskMetricValue("status", "paused");
        assertTaskMetricValue("running-ratio", 0.0d);
        assertTaskMetricValue("pause-ratio", 1.0d);
        assertTaskMetricValue("offset-commit-max-time-ms", Double.NaN);
    }

    @Test
    public void testPause() {
        createTask(this.initialState);
        this.workerTask.initialize(TASK_CONFIG);
        this.workerTask.initializeAndStart();
        verifyInitializeTask();
        expectTaskGetTopic();
        expectPollInitialAssignment().thenAnswer(expectConsumerPoll(1)).thenThrow(new Throwable[]{new WakeupException()}).thenAnswer(expectConsumerPoll(0)).thenThrow(new Throwable[]{new WakeupException()}).thenAnswer(expectConsumerPoll(1));
        expectConversionAndTransformation(null, new RecordHeaders());
        this.workerTask.iteration();
        verifyPollInitialAssignment();
        this.workerTask.iteration();
        ((SinkTask) Mockito.verify(this.sinkTask, Mockito.times(2))).put(ArgumentMatchers.anyList());
        this.workerTask.transitionTo(TargetState.PAUSED);
        this.time.sleep(10000L);
        assertSinkMetricValue("partition-count", 2.0d);
        assertSinkMetricValue("sink-record-read-total", 1.0d);
        assertSinkMetricValue("sink-record-send-total", 1.0d);
        assertSinkMetricValue("sink-record-active-count", 1.0d);
        assertSinkMetricValue("sink-record-active-count-max", 1.0d);
        assertSinkMetricValue("sink-record-active-count-avg", 0.333333d);
        assertSinkMetricValue("offset-commit-seq-no", 0.0d);
        assertSinkMetricValue("offset-commit-completion-rate", 0.0d);
        assertSinkMetricValue("offset-commit-completion-total", 0.0d);
        assertSinkMetricValue("offset-commit-skip-rate", 0.0d);
        assertSinkMetricValue("offset-commit-skip-total", 0.0d);
        assertTaskMetricValue("status", "running");
        assertTaskMetricValue("running-ratio", 1.0d);
        assertTaskMetricValue("pause-ratio", 0.0d);
        assertTaskMetricValue("batch-size-max", 1.0d);
        assertTaskMetricValue("batch-size-avg", 0.5d);
        assertTaskMetricValue("offset-commit-max-time-ms", Double.NaN);
        assertTaskMetricValue("offset-commit-failure-percentage", 0.0d);
        assertTaskMetricValue("offset-commit-success-percentage", 0.0d);
        this.workerTask.iteration();
        ((TaskStatus.Listener) Mockito.verify(this.statusListener)).onPause(this.taskId);
        ((KafkaConsumer) Mockito.verify(this.consumer)).pause(INITIAL_ASSIGNMENT);
        ((KafkaConsumer) Mockito.verify(this.consumer)).wakeup();
        Mockito.when(this.sinkTask.preCommit(ArgumentMatchers.anyMap())).thenReturn(Collections.emptyMap());
        this.workerTask.iteration();
        this.time.sleep(30000L);
        assertSinkMetricValue("offset-commit-seq-no", 1.0d);
        assertSinkMetricValue("offset-commit-completion-rate", 0.0333d);
        assertSinkMetricValue("offset-commit-completion-total", 1.0d);
        assertSinkMetricValue("offset-commit-skip-rate", 0.0d);
        assertSinkMetricValue("offset-commit-skip-total", 0.0d);
        assertTaskMetricValue("status", "paused");
        assertTaskMetricValue("running-ratio", 0.25d);
        assertTaskMetricValue("pause-ratio", 0.75d);
        ((SinkTask) Mockito.verify(this.sinkTask, Mockito.times(3))).put(ArgumentMatchers.anyList());
        this.workerTask.transitionTo(TargetState.STARTED);
        this.workerTask.iteration();
        this.workerTask.iteration();
        ((TaskStatus.Listener) Mockito.verify(this.statusListener)).onResume(this.taskId);
        ((KafkaConsumer) Mockito.verify(this.consumer, Mockito.times(2))).wakeup();
        INITIAL_ASSIGNMENT.forEach(topicPartition -> {
            ((KafkaConsumer) Mockito.verify(this.consumer)).resume(Collections.singleton(topicPartition));
        });
        ((SinkTask) Mockito.verify(this.sinkTask, Mockito.times(4))).put(ArgumentMatchers.anyList());
    }

    @Test
    public void testShutdown() throws Exception {
        createTask(this.initialState);
        this.workerTask.initialize(TASK_CONFIG);
        this.workerTask.initializeAndStart();
        verifyInitializeTask();
        expectTaskGetTopic();
        expectPollInitialAssignment().thenAnswer(expectConsumerPoll(1));
        expectConversionAndTransformation(null, new RecordHeaders());
        this.workerTask.iteration();
        verifyPollInitialAssignment();
        ((WorkerSinkTaskContext) this.sinkTaskContext.getValue()).requestCommit();
        Mockito.when(this.sinkTask.preCommit(ArgumentMatchers.anyMap())).thenReturn(Collections.emptyMap());
        this.workerTask.iteration();
        ((SinkTask) Mockito.verify(this.sinkTask, Mockito.times(2))).put(ArgumentMatchers.anyList());
        ((KafkaConsumer) Mockito.doAnswer(invocationOnMock -> {
            ((ConsumerRebalanceListener) this.rebalanceListener.getValue()).onPartitionsRevoked(INITIAL_ASSIGNMENT);
            return null;
        }).when(this.consumer)).close();
        this.workerTask.stop();
        ((KafkaConsumer) Mockito.verify(this.consumer)).wakeup();
        this.workerTask.close();
        ((SinkTask) Mockito.verify(this.sinkTask)).stop();
        ((KafkaConsumer) Mockito.verify(this.consumer)).close();
        ((HeaderConverter) Mockito.verify(this.headerConverter)).close();
    }

    @Test
    public void testPollRedelivery() {
        createTask(this.initialState);
        expectTaskGetTopic();
        this.workerTask.initialize(TASK_CONFIG);
        this.workerTask.initializeAndStart();
        verifyInitializeTask();
        expectPollInitialAssignment().thenAnswer(expectConsumerPoll(1)).thenAnswer(expectConsumerPoll(0)).thenAnswer(expectConsumerPoll(0));
        expectConversionAndTransformation(null, new RecordHeaders());
        ((SinkTask) Mockito.doNothing().doThrow(new Throwable[]{new RetriableException("retry")}).doNothing().when(this.sinkTask)).put(ArgumentMatchers.anyList());
        this.workerTask.iteration();
        this.time.sleep(10000L);
        verifyPollInitialAssignment();
        ((SinkTask) Mockito.verify(this.sinkTask)).put(ArgumentMatchers.anyList());
        assertSinkMetricValue("partition-count", 2.0d);
        assertSinkMetricValue("sink-record-read-total", 0.0d);
        assertSinkMetricValue("sink-record-send-total", 0.0d);
        assertSinkMetricValue("sink-record-active-count", 0.0d);
        assertSinkMetricValue("sink-record-active-count-max", 0.0d);
        assertSinkMetricValue("sink-record-active-count-avg", 0.0d);
        assertSinkMetricValue("offset-commit-seq-no", 0.0d);
        assertSinkMetricValue("offset-commit-completion-rate", 0.0d);
        assertSinkMetricValue("offset-commit-completion-total", 0.0d);
        assertSinkMetricValue("offset-commit-skip-rate", 0.0d);
        assertSinkMetricValue("offset-commit-skip-total", 0.0d);
        assertTaskMetricValue("status", "running");
        assertTaskMetricValue("running-ratio", 1.0d);
        assertTaskMetricValue("pause-ratio", 0.0d);
        assertTaskMetricValue("batch-size-max", 0.0d);
        assertTaskMetricValue("batch-size-avg", 0.0d);
        assertTaskMetricValue("offset-commit-max-time-ms", Double.NaN);
        assertTaskMetricValue("offset-commit-failure-percentage", 0.0d);
        assertTaskMetricValue("offset-commit-success-percentage", 0.0d);
        this.workerTask.iteration();
        ((KafkaConsumer) Mockito.verify(this.consumer, Mockito.times(3))).assignment();
        ((KafkaConsumer) Mockito.verify(this.consumer)).pause(INITIAL_ASSIGNMENT);
        this.workerTask.iteration();
        this.time.sleep(30000L);
        ((SinkTask) Mockito.verify(this.sinkTask, Mockito.times(3))).put(ArgumentMatchers.anyList());
        INITIAL_ASSIGNMENT.forEach(topicPartition -> {
            ((KafkaConsumer) Mockito.verify(this.consumer)).resume(Collections.singleton(topicPartition));
        });
        assertSinkMetricValue("sink-record-read-total", 1.0d);
        assertSinkMetricValue("sink-record-send-total", 1.0d);
        assertSinkMetricValue("sink-record-active-count", 1.0d);
        assertSinkMetricValue("sink-record-active-count-max", 1.0d);
        assertSinkMetricValue("sink-record-active-count-avg", 0.5d);
        assertTaskMetricValue("status", "running");
        assertTaskMetricValue("running-ratio", 1.0d);
        assertTaskMetricValue("batch-size-max", 1.0d);
        assertTaskMetricValue("batch-size-avg", 0.5d);
        HashMap hashMap = new HashMap();
        hashMap.put(TOPIC_PARTITION, new OffsetAndMetadata(46L));
        hashMap.put(TOPIC_PARTITION2, new OffsetAndMetadata(FIRST_OFFSET));
        Mockito.when(this.sinkTask.preCommit(hashMap)).thenReturn(hashMap);
        ((WorkerSinkTaskContext) this.sinkTaskContext.getValue()).requestCommit();
        this.time.sleep(10000L);
        this.workerTask.iteration();
        ArgumentCaptor forClass = ArgumentCaptor.forClass(OffsetCommitCallback.class);
        ((KafkaConsumer) Mockito.verify(this.consumer)).commitAsync((Map) ArgumentMatchers.eq(hashMap), (OffsetCommitCallback) forClass.capture());
        ((OffsetCommitCallback) forClass.getValue()).onComplete(hashMap, (Exception) null);
        ((SinkTask) Mockito.verify(this.sinkTask, Mockito.times(4))).put(ArgumentMatchers.anyList());
        assertSinkMetricValue("offset-commit-completion-total", 1.0d);
    }

    @Test
    public void testPollRedeliveryWithConsumerRebalance() {
        createTask(this.initialState);
        expectTaskGetTopic();
        this.workerTask.initialize(TASK_CONFIG);
        this.workerTask.initializeAndStart();
        verifyInitializeTask();
        HashSet hashSet = new HashSet(Arrays.asList(TOPIC_PARTITION, TOPIC_PARTITION2, TOPIC_PARTITION3));
        Mockito.when(this.consumer.assignment()).thenReturn(INITIAL_ASSIGNMENT, new Set[]{INITIAL_ASSIGNMENT, INITIAL_ASSIGNMENT}).thenReturn(hashSet, new Set[]{hashSet, hashSet}).thenReturn(Collections.singleton(TOPIC_PARTITION3), new Set[]{Collections.singleton(TOPIC_PARTITION3), Collections.singleton(TOPIC_PARTITION3)});
        INITIAL_ASSIGNMENT.forEach(topicPartition -> {
            Mockito.when(Long.valueOf(this.consumer.position(topicPartition))).thenReturn(Long.valueOf(FIRST_OFFSET));
        });
        Mockito.when(Long.valueOf(this.consumer.position(TOPIC_PARTITION3))).thenReturn(Long.valueOf(FIRST_OFFSET));
        Mockito.when(this.consumer.poll((Duration) ArgumentMatchers.any(Duration.class))).thenAnswer(invocationOnMock -> {
            ((ConsumerRebalanceListener) this.rebalanceListener.getValue()).onPartitionsAssigned(INITIAL_ASSIGNMENT);
            return ConsumerRecords.empty();
        }).thenAnswer(expectConsumerPoll(1)).thenAnswer(invocationOnMock2 -> {
            ((ConsumerRebalanceListener) this.rebalanceListener.getValue()).onPartitionsRevoked(Collections.emptySet());
            ((ConsumerRebalanceListener) this.rebalanceListener.getValue()).onPartitionsAssigned(Collections.singleton(TOPIC_PARTITION3));
            return ConsumerRecords.empty();
        }).thenAnswer(expectConsumerPoll(0)).thenAnswer(invocationOnMock3 -> {
            ConsumerRecord consumerRecord = new ConsumerRecord(TOPIC, PARTITION3, FIRST_OFFSET, RAW_KEY, RAW_VALUE);
            ((ConsumerRebalanceListener) this.rebalanceListener.getValue()).onPartitionsRevoked(INITIAL_ASSIGNMENT);
            ((ConsumerRebalanceListener) this.rebalanceListener.getValue()).onPartitionsAssigned(Collections.emptyList());
            return new ConsumerRecords(Collections.singletonMap(TOPIC_PARTITION3, Collections.singletonList(consumerRecord)));
        });
        expectConversionAndTransformation(null, new RecordHeaders());
        ((SinkTask) Mockito.doNothing().doThrow(new Throwable[]{new RetriableException("retry")}).doThrow(new Throwable[]{new RetriableException("retry")}).doThrow(new Throwable[]{new RetriableException("retry")}).doNothing().when(this.sinkTask)).put((Collection) ArgumentMatchers.any(Collection.class));
        this.workerTask.iteration();
        this.workerTask.iteration();
        ((KafkaConsumer) Mockito.verify(this.consumer)).pause(INITIAL_ASSIGNMENT);
        this.workerTask.iteration();
        ((SinkTask) Mockito.verify(this.sinkTask)).open(Collections.singleton(TOPIC_PARTITION3));
        ((KafkaConsumer) Mockito.verify(this.consumer)).pause(hashSet);
        this.workerTask.iteration();
        Map map = (Map) INITIAL_ASSIGNMENT.stream().collect(Collectors.toMap(Function.identity(), topicPartition2 -> {
            return new OffsetAndMetadata(FIRST_OFFSET);
        }));
        Mockito.when(this.sinkTask.preCommit(map)).thenReturn(map);
        Set singleton = Collections.singleton(TOPIC_PARTITION3);
        this.workerTask.iteration();
        ((SinkTask) Mockito.verify(this.sinkTask)).close(INITIAL_ASSIGNMENT);
        singleton.forEach(topicPartition3 -> {
            ((KafkaConsumer) Mockito.verify(this.consumer)).resume(Collections.singleton(topicPartition3));
        });
    }

    @Test
    public void testErrorInRebalancePartitionLoss() {
        RuntimeException runtimeException = new RuntimeException("Revocation error");
        createTask(this.initialState);
        this.workerTask.initialize(TASK_CONFIG);
        this.workerTask.initializeAndStart();
        verifyInitializeTask();
        expectPollInitialAssignment().thenAnswer(invocationOnMock -> {
            ((ConsumerRebalanceListener) this.rebalanceListener.getValue()).onPartitionsLost(INITIAL_ASSIGNMENT);
            return ConsumerRecords.empty();
        });
        ((SinkTask) Mockito.doThrow(new Throwable[]{runtimeException}).when(this.sinkTask)).close(INITIAL_ASSIGNMENT);
        this.workerTask.iteration();
        verifyPollInitialAssignment();
        Assert.assertEquals(runtimeException, (RuntimeException) Assert.assertThrows(RuntimeException.class, () -> {
            this.workerTask.iteration();
        }));
    }

    @Test
    public void testErrorInRebalancePartitionRevocation() {
        RuntimeException runtimeException = new RuntimeException("Revocation error");
        createTask(this.initialState);
        this.workerTask.initialize(TASK_CONFIG);
        this.workerTask.initializeAndStart();
        verifyInitializeTask();
        expectPollInitialAssignment().thenAnswer(invocationOnMock -> {
            ((ConsumerRebalanceListener) this.rebalanceListener.getValue()).onPartitionsRevoked(INITIAL_ASSIGNMENT);
            return ConsumerRecords.empty();
        });
        expectRebalanceRevocationError(runtimeException);
        this.workerTask.iteration();
        verifyPollInitialAssignment();
        Assert.assertEquals(runtimeException, (RuntimeException) Assert.assertThrows(RuntimeException.class, () -> {
            this.workerTask.iteration();
        }));
    }

    @Test
    public void testErrorInRebalancePartitionAssignment() {
        RuntimeException runtimeException = new RuntimeException("Assignment error");
        createTask(this.initialState);
        this.workerTask.initialize(TASK_CONFIG);
        this.workerTask.initializeAndStart();
        verifyInitializeTask();
        expectPollInitialAssignment().thenAnswer(invocationOnMock -> {
            ((ConsumerRebalanceListener) this.rebalanceListener.getValue()).onPartitionsRevoked(INITIAL_ASSIGNMENT);
            ((ConsumerRebalanceListener) this.rebalanceListener.getValue()).onPartitionsAssigned(INITIAL_ASSIGNMENT);
            return ConsumerRecords.empty();
        });
        this.workerTask.iteration();
        verifyPollInitialAssignment();
        expectRebalanceAssignmentError(runtimeException);
        try {
            Assert.assertEquals(runtimeException, (RuntimeException) Assert.assertThrows(RuntimeException.class, () -> {
                this.workerTask.iteration();
            }));
        } finally {
            ((SinkTask) Mockito.verify(this.sinkTask)).close(INITIAL_ASSIGNMENT);
        }
    }

    @Test
    public void testPartialRevocationAndAssignment() {
        createTask(this.initialState);
        Mockito.when(this.consumer.assignment()).thenReturn(INITIAL_ASSIGNMENT).thenReturn(INITIAL_ASSIGNMENT).thenReturn(Collections.singleton(TOPIC_PARTITION2)).thenReturn(Collections.singleton(TOPIC_PARTITION2)).thenReturn(new HashSet(Arrays.asList(TOPIC_PARTITION2, TOPIC_PARTITION3))).thenReturn(new HashSet(Arrays.asList(TOPIC_PARTITION2, TOPIC_PARTITION3))).thenReturn(INITIAL_ASSIGNMENT).thenReturn(INITIAL_ASSIGNMENT).thenReturn(INITIAL_ASSIGNMENT);
        INITIAL_ASSIGNMENT.forEach(topicPartition -> {
            Mockito.when(Long.valueOf(this.consumer.position(topicPartition))).thenReturn(Long.valueOf(FIRST_OFFSET));
        });
        this.workerTask.initialize(TASK_CONFIG);
        this.workerTask.initializeAndStart();
        verifyInitializeTask();
        Mockito.when(this.consumer.poll((Duration) ArgumentMatchers.any(Duration.class))).thenAnswer(invocationOnMock -> {
            ((ConsumerRebalanceListener) this.rebalanceListener.getValue()).onPartitionsAssigned(INITIAL_ASSIGNMENT);
            return ConsumerRecords.empty();
        }).thenAnswer(invocationOnMock2 -> {
            ((ConsumerRebalanceListener) this.rebalanceListener.getValue()).onPartitionsRevoked(Collections.singleton(TOPIC_PARTITION));
            ((ConsumerRebalanceListener) this.rebalanceListener.getValue()).onPartitionsAssigned(Collections.emptySet());
            return ConsumerRecords.empty();
        }).thenAnswer(invocationOnMock3 -> {
            ((ConsumerRebalanceListener) this.rebalanceListener.getValue()).onPartitionsRevoked(Collections.emptySet());
            ((ConsumerRebalanceListener) this.rebalanceListener.getValue()).onPartitionsAssigned(Collections.singleton(TOPIC_PARTITION3));
            return ConsumerRecords.empty();
        }).thenAnswer(invocationOnMock4 -> {
            ((ConsumerRebalanceListener) this.rebalanceListener.getValue()).onPartitionsLost(Collections.singleton(TOPIC_PARTITION3));
            ((ConsumerRebalanceListener) this.rebalanceListener.getValue()).onPartitionsAssigned(Collections.singleton(TOPIC_PARTITION));
            return ConsumerRecords.empty();
        });
        HashMap hashMap = new HashMap();
        hashMap.put(TOPIC_PARTITION, new OffsetAndMetadata(FIRST_OFFSET));
        Mockito.when(this.sinkTask.preCommit(hashMap)).thenReturn(hashMap);
        Mockito.when(Long.valueOf(this.consumer.position(TOPIC_PARTITION3))).thenReturn(Long.valueOf(FIRST_OFFSET));
        this.workerTask.iteration();
        verifyPollInitialAssignment();
        this.workerTask.iteration();
        ((SinkTask) Mockito.verify(this.sinkTask)).close(Collections.singleton(TOPIC_PARTITION));
        ((SinkTask) Mockito.verify(this.sinkTask, Mockito.times(2))).put(Collections.emptyList());
        this.workerTask.iteration();
        ((SinkTask) Mockito.verify(this.sinkTask)).open(Collections.singleton(TOPIC_PARTITION3));
        ((SinkTask) Mockito.verify(this.sinkTask, Mockito.times(3))).put(Collections.emptyList());
        this.workerTask.iteration();
        ((SinkTask) Mockito.verify(this.sinkTask)).close(Collections.singleton(TOPIC_PARTITION3));
        ((SinkTask) Mockito.verify(this.sinkTask)).open(Collections.singleton(TOPIC_PARTITION));
        ((SinkTask) Mockito.verify(this.sinkTask, Mockito.times(4))).put(Collections.emptyList());
    }

    @Test
    public void testPreCommitFailureAfterPartialRevocationAndAssignment() {
        createTask(this.initialState);
        expectTaskGetTopic();
        this.workerTask.initialize(TASK_CONFIG);
        this.workerTask.initializeAndStart();
        verifyInitializeTask();
        Mockito.when(this.consumer.assignment()).thenReturn(INITIAL_ASSIGNMENT, new Set[]{INITIAL_ASSIGNMENT}).thenReturn(new HashSet(Collections.singletonList(TOPIC_PARTITION2))).thenReturn(new HashSet(Collections.singletonList(TOPIC_PARTITION2))).thenReturn(new HashSet(Collections.singletonList(TOPIC_PARTITION2))).thenReturn(new HashSet(Arrays.asList(TOPIC_PARTITION2, TOPIC_PARTITION3))).thenReturn(new HashSet(Arrays.asList(TOPIC_PARTITION2, TOPIC_PARTITION3))).thenReturn(new HashSet(Arrays.asList(TOPIC_PARTITION2, TOPIC_PARTITION3)));
        INITIAL_ASSIGNMENT.forEach(topicPartition -> {
            Mockito.when(Long.valueOf(this.consumer.position(topicPartition))).thenReturn(Long.valueOf(FIRST_OFFSET));
        });
        Mockito.when(Long.valueOf(this.consumer.position(TOPIC_PARTITION3))).thenReturn(Long.valueOf(FIRST_OFFSET));
        Mockito.when(this.consumer.poll((Duration) ArgumentMatchers.any(Duration.class))).thenAnswer(invocationOnMock -> {
            ((ConsumerRebalanceListener) this.rebalanceListener.getValue()).onPartitionsAssigned(INITIAL_ASSIGNMENT);
            return ConsumerRecords.empty();
        }).thenAnswer(expectConsumerPoll(1)).thenAnswer(invocationOnMock2 -> {
            ((ConsumerRebalanceListener) this.rebalanceListener.getValue()).onPartitionsRevoked(Collections.singleton(TOPIC_PARTITION));
            ((ConsumerRebalanceListener) this.rebalanceListener.getValue()).onPartitionsAssigned(Collections.emptySet());
            return ConsumerRecords.empty();
        }).thenAnswer(invocationOnMock3 -> {
            ((ConsumerRebalanceListener) this.rebalanceListener.getValue()).onPartitionsRevoked(Collections.emptySet());
            ((ConsumerRebalanceListener) this.rebalanceListener.getValue()).onPartitionsAssigned(Collections.singleton(TOPIC_PARTITION3));
            return ConsumerRecords.empty();
        }).thenAnswer(expectConsumerPoll(0));
        expectConversionAndTransformation(null, new RecordHeaders());
        this.workerTask.iteration();
        this.workerTask.iteration();
        HashMap hashMap = new HashMap();
        hashMap.put(TOPIC_PARTITION, new OffsetAndMetadata(46L));
        Mockito.when(this.sinkTask.preCommit(hashMap)).thenReturn(hashMap);
        ((KafkaConsumer) Mockito.doNothing().when(this.consumer)).commitSync(hashMap);
        this.workerTask.iteration();
        ((SinkTask) Mockito.verify(this.sinkTask)).close(Collections.singleton(TOPIC_PARTITION));
        ((SinkTask) Mockito.verify(this.sinkTask, Mockito.times(2))).put(Collections.emptyList());
        this.workerTask.iteration();
        ((SinkTask) Mockito.verify(this.sinkTask)).open(Collections.singleton(TOPIC_PARTITION3));
        HashMap hashMap2 = new HashMap();
        hashMap2.put(TOPIC_PARTITION2, new OffsetAndMetadata(FIRST_OFFSET));
        hashMap2.put(TOPIC_PARTITION3, new OffsetAndMetadata(FIRST_OFFSET));
        Mockito.when(this.sinkTask.preCommit(hashMap2)).thenThrow(new Throwable[]{new ConnectException("Failed to flush")});
        ((WorkerSinkTaskContext) this.sinkTaskContext.getValue()).requestCommit();
        this.workerTask.iteration();
        ((KafkaConsumer) Mockito.verify(this.consumer)).seek(TOPIC_PARTITION2, FIRST_OFFSET);
        ((KafkaConsumer) Mockito.verify(this.consumer)).seek(TOPIC_PARTITION3, FIRST_OFFSET);
    }

    @Test
    public void testWakeupInCommitSyncCausesRetry() {
        createTask(this.initialState);
        this.workerTask.initialize(TASK_CONFIG);
        this.time.sleep(30000L);
        this.workerTask.initializeAndStart();
        this.time.sleep(30000L);
        verifyInitializeTask();
        expectTaskGetTopic();
        expectPollInitialAssignment().thenAnswer(expectConsumerPoll(1)).thenAnswer(invocationOnMock -> {
            ((ConsumerRebalanceListener) this.rebalanceListener.getValue()).onPartitionsRevoked(INITIAL_ASSIGNMENT);
            ((ConsumerRebalanceListener) this.rebalanceListener.getValue()).onPartitionsAssigned(INITIAL_ASSIGNMENT);
            return ConsumerRecords.empty();
        });
        expectConversionAndTransformation(null, new RecordHeaders());
        this.workerTask.iteration();
        this.time.sleep(30000L);
        HashMap hashMap = new HashMap();
        hashMap.put(TOPIC_PARTITION, new OffsetAndMetadata(46L));
        hashMap.put(TOPIC_PARTITION2, new OffsetAndMetadata(FIRST_OFFSET));
        Mockito.when(this.sinkTask.preCommit(hashMap)).thenReturn(hashMap);
        ((KafkaConsumer) Mockito.doThrow(new Throwable[]{new WakeupException()}).doNothing().when(this.consumer)).commitSync(hashMap);
        this.workerTask.iteration();
        this.workerTask.iteration();
        this.time.sleep(30000L);
        ((SinkTask) Mockito.verify(this.sinkTask)).close(INITIAL_ASSIGNMENT);
        ((SinkTask) Mockito.verify(this.sinkTask, Mockito.times(2))).open(INITIAL_ASSIGNMENT);
        INITIAL_ASSIGNMENT.forEach(topicPartition -> {
            ((KafkaConsumer) Mockito.verify(this.consumer)).resume(Collections.singleton(topicPartition));
        });
        ((TaskStatus.Listener) Mockito.verify(this.statusListener)).onResume(this.taskId);
        assertSinkMetricValue("partition-count", 2.0d);
        assertSinkMetricValue("sink-record-read-total", 1.0d);
        assertSinkMetricValue("sink-record-send-total", 1.0d);
        assertSinkMetricValue("sink-record-active-count", 0.0d);
        assertSinkMetricValue("sink-record-active-count-max", 1.0d);
        assertSinkMetricValue("sink-record-active-count-avg", 0.33333d);
        assertSinkMetricValue("offset-commit-seq-no", 1.0d);
        assertSinkMetricValue("offset-commit-completion-total", 1.0d);
        assertSinkMetricValue("offset-commit-skip-total", 0.0d);
        assertTaskMetricValue("status", "running");
        assertTaskMetricValue("running-ratio", 1.0d);
        assertTaskMetricValue("pause-ratio", 0.0d);
        assertTaskMetricValue("batch-size-max", 1.0d);
        assertTaskMetricValue("batch-size-avg", 1.0d);
        assertTaskMetricValue("offset-commit-max-time-ms", 0.0d);
        assertTaskMetricValue("offset-commit-avg-time-ms", 0.0d);
        assertTaskMetricValue("offset-commit-failure-percentage", 0.0d);
        assertTaskMetricValue("offset-commit-success-percentage", 1.0d);
    }

    @Test
    public void testWakeupNotThrownDuringShutdown() {
        createTask(this.initialState);
        this.workerTask.initialize(TASK_CONFIG);
        this.workerTask.initializeAndStart();
        verifyInitializeTask();
        expectTaskGetTopic();
        expectPollInitialAssignment().thenAnswer(expectConsumerPoll(1)).thenAnswer(invocationOnMock -> {
            this.workerTask.stop();
            return new ConsumerRecords(Collections.emptyMap());
        });
        expectConversionAndTransformation(null, new RecordHeaders());
        HashMap hashMap = new HashMap();
        hashMap.put(TOPIC_PARTITION, new OffsetAndMetadata(46L));
        hashMap.put(TOPIC_PARTITION2, new OffsetAndMetadata(FIRST_OFFSET));
        Mockito.when(this.sinkTask.preCommit(hashMap)).thenReturn(hashMap);
        ((KafkaConsumer) Mockito.doThrow(new Throwable[]{new WakeupException()}).doNothing().when(this.consumer)).commitSync(hashMap);
        this.workerTask.execute();
        Assert.assertEquals(0L, this.workerTask.commitFailures());
        ((KafkaConsumer) Mockito.verify(this.consumer)).wakeup();
        ((SinkTask) Mockito.verify(this.sinkTask)).close((Collection) ArgumentMatchers.any(Collection.class));
    }

    @Test
    public void testRequestCommit() {
        createTask(this.initialState);
        this.workerTask.initialize(TASK_CONFIG);
        this.workerTask.initializeAndStart();
        verifyInitializeTask();
        expectTaskGetTopic();
        expectPollInitialAssignment().thenAnswer(expectConsumerPoll(1)).thenAnswer(expectConsumerPoll(0));
        expectConversionAndTransformation(null, new RecordHeaders());
        this.time.sleep(30000L);
        this.workerTask.iteration();
        assertSinkMetricValue("partition-count", 2.0d);
        HashMap hashMap = new HashMap();
        hashMap.put(TOPIC_PARTITION, new OffsetAndMetadata(46L));
        hashMap.put(TOPIC_PARTITION2, new OffsetAndMetadata(FIRST_OFFSET));
        Mockito.when(this.sinkTask.preCommit(hashMap)).thenReturn(hashMap);
        this.workerTask.iteration();
        assertSinkMetricValue("partition-count", 2.0d);
        assertSinkMetricValue("sink-record-read-total", 1.0d);
        assertSinkMetricValue("sink-record-send-total", 1.0d);
        assertSinkMetricValue("sink-record-active-count", 1.0d);
        assertSinkMetricValue("sink-record-active-count-max", 1.0d);
        assertSinkMetricValue("sink-record-active-count-avg", 0.333333d);
        assertSinkMetricValue("offset-commit-seq-no", 0.0d);
        assertSinkMetricValue("offset-commit-completion-total", 0.0d);
        assertSinkMetricValue("offset-commit-skip-total", 0.0d);
        assertTaskMetricValue("status", "running");
        assertTaskMetricValue("running-ratio", 1.0d);
        assertTaskMetricValue("pause-ratio", 0.0d);
        assertTaskMetricValue("batch-size-max", 1.0d);
        assertTaskMetricValue("batch-size-avg", 0.5d);
        assertTaskMetricValue("offset-commit-failure-percentage", 0.0d);
        assertTaskMetricValue("offset-commit-success-percentage", 0.0d);
        long nextCommit = this.workerTask.getNextCommit();
        ((WorkerSinkTaskContext) this.sinkTaskContext.getValue()).requestCommit();
        Assert.assertTrue(((WorkerSinkTaskContext) this.sinkTaskContext.getValue()).isCommitRequested());
        Assert.assertNotEquals(hashMap, this.workerTask.lastCommittedOffsets());
        ArgumentCaptor forClass = ArgumentCaptor.forClass(OffsetCommitCallback.class);
        this.time.sleep(10000L);
        this.workerTask.iteration();
        ((KafkaConsumer) Mockito.verify(this.consumer)).commitAsync((Map) ArgumentMatchers.eq(hashMap), (OffsetCommitCallback) forClass.capture());
        ((OffsetCommitCallback) forClass.getValue()).onComplete(hashMap, (Exception) null);
        this.time.sleep(10000L);
        Assert.assertFalse(((WorkerSinkTaskContext) this.sinkTaskContext.getValue()).isCommitRequested());
        Assert.assertEquals(hashMap, this.workerTask.lastCommittedOffsets());
        Assert.assertEquals(0L, this.workerTask.commitFailures());
        Assert.assertEquals("Should have only advanced by 40 seconds", nextCommit + 40000, this.workerTask.getNextCommit());
        assertSinkMetricValue("partition-count", 2.0d);
        assertSinkMetricValue("sink-record-read-total", 1.0d);
        assertSinkMetricValue("sink-record-send-total", 1.0d);
        assertSinkMetricValue("sink-record-active-count", 0.0d);
        assertSinkMetricValue("sink-record-active-count-max", 1.0d);
        assertSinkMetricValue("sink-record-active-count-avg", 0.2d);
        assertSinkMetricValue("offset-commit-seq-no", 1.0d);
        assertSinkMetricValue("offset-commit-completion-total", 1.0d);
        assertSinkMetricValue("offset-commit-skip-total", 0.0d);
        assertTaskMetricValue("status", "running");
        assertTaskMetricValue("running-ratio", 1.0d);
        assertTaskMetricValue("pause-ratio", 0.0d);
        assertTaskMetricValue("batch-size-max", 1.0d);
        assertTaskMetricValue("batch-size-avg", 0.33333d);
        assertTaskMetricValue("offset-commit-max-time-ms", 0.0d);
        assertTaskMetricValue("offset-commit-avg-time-ms", 0.0d);
        assertTaskMetricValue("offset-commit-failure-percentage", 0.0d);
        assertTaskMetricValue("offset-commit-success-percentage", 1.0d);
    }

    @Test
    public void testPreCommit() {
        createTask(this.initialState);
        this.workerTask.initialize(TASK_CONFIG);
        this.workerTask.initializeAndStart();
        verifyInitializeTask();
        expectTaskGetTopic();
        expectPollInitialAssignment().thenAnswer(expectConsumerPoll(2)).thenAnswer(expectConsumerPoll(0));
        expectConversionAndTransformation(null, new RecordHeaders());
        this.workerTask.iteration();
        HashMap hashMap = new HashMap();
        hashMap.put(TOPIC_PARTITION, new OffsetAndMetadata(FIRST_OFFSET));
        hashMap.put(TOPIC_PARTITION2, new OffsetAndMetadata(FIRST_OFFSET));
        Assert.assertEquals(hashMap, this.workerTask.currentOffsets());
        HashMap hashMap2 = new HashMap();
        hashMap2.put(TOPIC_PARTITION, new OffsetAndMetadata(47L));
        hashMap2.put(TOPIC_PARTITION2, new OffsetAndMetadata(FIRST_OFFSET));
        HashMap hashMap3 = new HashMap();
        hashMap3.put(TOPIC_PARTITION, new OffsetAndMetadata(46L));
        hashMap3.put(TOPIC_PARTITION2, new OffsetAndMetadata(46L));
        hashMap3.put(new TopicPartition(TOPIC, 3), new OffsetAndMetadata(FIRST_OFFSET));
        Mockito.when(this.sinkTask.preCommit(hashMap2)).thenReturn(hashMap3);
        this.workerTask.iteration();
        HashMap hashMap4 = new HashMap();
        hashMap4.put(TOPIC_PARTITION, new OffsetAndMetadata(46L));
        hashMap4.put(TOPIC_PARTITION2, new OffsetAndMetadata(FIRST_OFFSET));
        Assert.assertEquals(hashMap2, this.workerTask.currentOffsets());
        Assert.assertEquals(hashMap, this.workerTask.lastCommittedOffsets());
        ((WorkerSinkTaskContext) this.sinkTaskContext.getValue()).requestCommit();
        this.workerTask.iteration();
        ArgumentCaptor forClass = ArgumentCaptor.forClass(OffsetCommitCallback.class);
        ((KafkaConsumer) Mockito.verify(this.consumer)).commitAsync((Map) ArgumentMatchers.eq(hashMap4), (OffsetCommitCallback) forClass.capture());
        ((OffsetCommitCallback) forClass.getValue()).onComplete(hashMap4, (Exception) null);
        Assert.assertEquals(hashMap4, this.workerTask.lastCommittedOffsets());
    }

    @Test
    public void testPreCommitFailure() {
        createTask(this.initialState);
        this.workerTask.initialize(TASK_CONFIG);
        this.workerTask.initializeAndStart();
        verifyInitializeTask();
        expectTaskGetTopic();
        expectPollInitialAssignment().thenAnswer(expectConsumerPoll(2)).thenAnswer(expectConsumerPoll(0));
        expectConversionAndTransformation(null, new RecordHeaders());
        this.workerTask.iteration();
        this.workerTask.iteration();
        HashMap hashMap = new HashMap();
        hashMap.put(TOPIC_PARTITION, new OffsetAndMetadata(47L));
        hashMap.put(TOPIC_PARTITION2, new OffsetAndMetadata(FIRST_OFFSET));
        Mockito.when(this.sinkTask.preCommit(hashMap)).thenThrow(new Throwable[]{new ConnectException("Failed to flush")});
        ((WorkerSinkTaskContext) this.sinkTaskContext.getValue()).requestCommit();
        this.workerTask.iteration();
        ((KafkaConsumer) Mockito.verify(this.consumer)).seek(TOPIC_PARTITION, FIRST_OFFSET);
        ((KafkaConsumer) Mockito.verify(this.consumer)).seek(TOPIC_PARTITION2, FIRST_OFFSET);
    }

    @Test
    public void testIgnoredCommit() {
        createTask(this.initialState);
        this.workerTask.initialize(TASK_CONFIG);
        this.workerTask.initializeAndStart();
        verifyInitializeTask();
        expectTaskGetTopic();
        expectPollInitialAssignment().thenAnswer(expectConsumerPoll(1)).thenAnswer(expectConsumerPoll(0));
        expectConversionAndTransformation(null, new RecordHeaders());
        this.workerTask.iteration();
        HashMap hashMap = new HashMap();
        hashMap.put(TOPIC_PARTITION, new OffsetAndMetadata(FIRST_OFFSET));
        hashMap.put(TOPIC_PARTITION2, new OffsetAndMetadata(FIRST_OFFSET));
        Assert.assertEquals(hashMap, this.workerTask.currentOffsets());
        Assert.assertEquals(hashMap, this.workerTask.lastCommittedOffsets());
        this.workerTask.iteration();
        HashMap hashMap2 = new HashMap();
        hashMap2.put(TOPIC_PARTITION, new OffsetAndMetadata(46L));
        hashMap2.put(TOPIC_PARTITION2, new OffsetAndMetadata(FIRST_OFFSET));
        Mockito.when(this.sinkTask.preCommit(hashMap2)).thenReturn(hashMap);
        ((WorkerSinkTaskContext) this.sinkTaskContext.getValue()).requestCommit();
        this.workerTask.iteration();
    }

    @Test
    public void testLongRunningCommitWithoutTimeout() {
        createTask(this.initialState);
        this.workerTask.initialize(TASK_CONFIG);
        this.workerTask.initializeAndStart();
        verifyInitializeTask();
        expectTaskGetTopic();
        expectPollInitialAssignment().thenAnswer(expectConsumerPoll(1)).thenAnswer(expectConsumerPoll(0));
        expectConversionAndTransformation(null, new RecordHeaders());
        HashMap hashMap = new HashMap();
        hashMap.put(TOPIC_PARTITION, new OffsetAndMetadata(FIRST_OFFSET));
        hashMap.put(TOPIC_PARTITION2, new OffsetAndMetadata(FIRST_OFFSET));
        this.workerTask.iteration();
        Assert.assertEquals(hashMap, this.workerTask.currentOffsets());
        Assert.assertEquals(hashMap, this.workerTask.lastCommittedOffsets());
        this.time.sleep(ErrorHandlingTaskTest.OPERATOR_RETRY_MAX_DELAY_MILLIS);
        this.workerTask.iteration();
        HashMap hashMap2 = new HashMap();
        hashMap2.put(TOPIC_PARTITION, new OffsetAndMetadata(46L));
        hashMap2.put(TOPIC_PARTITION2, new OffsetAndMetadata(FIRST_OFFSET));
        Mockito.when(this.sinkTask.preCommit(hashMap2)).thenReturn(hashMap2);
        ((WorkerSinkTaskContext) this.sinkTaskContext.getValue()).requestCommit();
        this.workerTask.iteration();
        Assert.assertTrue("Expected worker to be in the process of committing offsets", this.workerTask.isCommitting());
        ArgumentCaptor forClass = ArgumentCaptor.forClass(OffsetCommitCallback.class);
        ((KafkaConsumer) Mockito.verify(this.consumer)).commitAsync((Map) ArgumentMatchers.eq(hashMap2), (OffsetCommitCallback) forClass.capture());
        ((OffsetCommitCallback) forClass.getValue()).onComplete(hashMap2, (Exception) null);
        Assert.assertEquals(hashMap2, this.workerTask.currentOffsets());
        Assert.assertEquals(hashMap2, this.workerTask.lastCommittedOffsets());
        Assert.assertFalse(this.workerTask.isCommitting());
    }

    @Test
    public void testSinkTasksHandleCloseErrors() {
        createTask(this.initialState);
        this.workerTask.initialize(TASK_CONFIG);
        this.workerTask.initializeAndStart();
        verifyInitializeTask();
        expectTaskGetTopic();
        expectPollInitialAssignment().thenAnswer(expectConsumerPoll(1)).thenAnswer(expectConsumerPoll(1));
        expectConversionAndTransformation(null, new RecordHeaders());
        ((SinkTask) Mockito.doNothing().doAnswer(invocationOnMock -> {
            this.workerTask.stop();
            return null;
        }).when(this.sinkTask)).put(ArgumentMatchers.anyList());
        RuntimeException runtimeException = new RuntimeException();
        Mockito.when(this.sinkTask.preCommit(ArgumentMatchers.anyMap())).thenReturn(Collections.emptyMap());
        ((SinkTask) Mockito.doThrow(new Throwable[]{runtimeException}).when(this.sinkTask)).close((Collection) ArgumentMatchers.any(Collection.class));
        Assert.assertEquals(runtimeException, (RuntimeException) Assert.assertThrows(RuntimeException.class, () -> {
            this.workerTask.execute();
        }));
        ((KafkaConsumer) Mockito.verify(this.consumer)).wakeup();
    }

    @Test
    public void testSuppressCloseErrors() {
        createTask(this.initialState);
        this.workerTask.initialize(TASK_CONFIG);
        this.workerTask.initializeAndStart();
        verifyInitializeTask();
        expectTaskGetTopic();
        expectPollInitialAssignment().thenAnswer(expectConsumerPoll(1)).thenAnswer(expectConsumerPoll(1));
        expectConversionAndTransformation(null, new RecordHeaders());
        RuntimeException runtimeException = new RuntimeException();
        RuntimeException runtimeException2 = new RuntimeException();
        ((SinkTask) Mockito.doNothing().doThrow(new Throwable[]{runtimeException}).when(this.sinkTask)).put(ArgumentMatchers.anyList());
        Mockito.when(this.sinkTask.preCommit(ArgumentMatchers.anyMap())).thenReturn(Collections.emptyMap());
        ((SinkTask) Mockito.doThrow(new Throwable[]{runtimeException2}).when(this.sinkTask)).close((Collection) ArgumentMatchers.any(Collection.class));
        this.workerTask.initialize(TASK_CONFIG);
        this.workerTask.initializeAndStart();
        RuntimeException runtimeException3 = (RuntimeException) Assert.assertThrows(ConnectException.class, () -> {
            this.workerTask.execute();
        });
        Assert.assertEquals("Exception from put should be the cause", runtimeException, runtimeException3.getCause());
        Assert.assertTrue("Exception from close should be suppressed", runtimeException3.getSuppressed().length > 0);
        Assert.assertEquals(runtimeException2, runtimeException3.getSuppressed()[0]);
    }

    @Test
    public void testTaskCancelPreventsFinalOffsetCommit() {
        createTask(this.initialState);
        this.workerTask.initialize(TASK_CONFIG);
        this.workerTask.initializeAndStart();
        verifyInitializeTask();
        expectTaskGetTopic();
        expectPollInitialAssignment().thenAnswer(expectConsumerPoll(1)).thenAnswer(expectConsumerPoll(1));
        expectConversionAndTransformation(null, new RecordHeaders());
        ((SinkTask) Mockito.doNothing().doNothing().doAnswer(invocationOnMock -> {
            this.workerTask.stop();
            this.workerTask.cancel();
            return null;
        }).when(this.sinkTask)).put(ArgumentMatchers.anyList());
        HashMap hashMap = new HashMap();
        hashMap.put(TOPIC_PARTITION, new OffsetAndMetadata(47L));
        hashMap.put(TOPIC_PARTITION2, new OffsetAndMetadata(FIRST_OFFSET));
        Mockito.when(this.sinkTask.preCommit(hashMap)).thenReturn(hashMap);
        this.workerTask.execute();
        ((KafkaConsumer) Mockito.verify(this.consumer)).wakeup();
        ((SinkTask) Mockito.verify(this.sinkTask)).close((Collection) ArgumentMatchers.any());
    }

    @Test
    public void testCommitWithOutOfOrderCallback() {
        createTask(this.initialState);
        this.workerTask.initialize(TASK_CONFIG);
        this.workerTask.initializeAndStart();
        verifyInitializeTask();
        Answer answer = invocationOnMock -> {
            ((ConsumerRebalanceListener) this.rebalanceListener.getValue()).onPartitionsAssigned(INITIAL_ASSIGNMENT);
            return ConsumerRecords.empty();
        };
        expectTaskGetTopic();
        expectConversionAndTransformation(null, new RecordHeaders());
        HashMap hashMap = new HashMap();
        hashMap.put(TOPIC_PARTITION, new OffsetAndMetadata(46L));
        hashMap.put(TOPIC_PARTITION2, new OffsetAndMetadata(FIRST_OFFSET));
        ArrayList arrayList = new ArrayList(INITIAL_ASSIGNMENT);
        List asList = Arrays.asList(TOPIC_PARTITION, TOPIC_PARTITION2, TOPIC_PARTITION3);
        HashMap hashMap2 = new HashMap();
        hashMap2.put(TOPIC_PARTITION, hashMap.get(TOPIC_PARTITION));
        hashMap2.put(TOPIC_PARTITION2, hashMap.get(TOPIC_PARTITION2));
        hashMap2.put(TOPIC_PARTITION3, new OffsetAndMetadata(FIRST_OFFSET));
        HashMap hashMap3 = new HashMap();
        hashMap3.put(TOPIC_PARTITION, new OffsetAndMetadata(48L));
        hashMap3.put(TOPIC_PARTITION2, new OffsetAndMetadata(FIRST_OFFSET));
        hashMap3.put(TOPIC_PARTITION3, new OffsetAndMetadata(47L));
        Mockito.when(this.sinkTask.preCommit(hashMap)).thenReturn(hashMap);
        AtomicReference atomicReference = new AtomicReference();
        AtomicBoolean atomicBoolean = new AtomicBoolean();
        ((KafkaConsumer) Mockito.doAnswer(invocationOnMock2 -> {
            Map map = (Map) invocationOnMock2.getArgument(0);
            OffsetCommitCallback offsetCommitCallback = (OffsetCommitCallback) invocationOnMock2.getArgument(1);
            atomicReference.set(() -> {
                offsetCommitCallback.onComplete(map, (Exception) null);
                atomicBoolean.set(true);
            });
            return null;
        }).when(this.consumer)).commitAsync((Map) ArgumentMatchers.eq(hashMap), (OffsetCommitCallback) ArgumentMatchers.any(OffsetCommitCallback.class));
        AtomicBoolean atomicBoolean2 = new AtomicBoolean();
        Answer answer2 = invocationOnMock3 -> {
            ((ConsumerRebalanceListener) this.rebalanceListener.getValue()).onPartitionsRevoked(arrayList);
            HashMap hashMap4 = new HashMap();
            hashMap4.put(TOPIC_PARTITION, Long.valueOf(((OffsetAndMetadata) hashMap2.get(TOPIC_PARTITION)).offset()));
            hashMap4.put(TOPIC_PARTITION2, Long.valueOf(((OffsetAndMetadata) hashMap2.get(TOPIC_PARTITION2)).offset()));
            hashMap4.put(TOPIC_PARTITION3, Long.valueOf(((OffsetAndMetadata) hashMap2.get(TOPIC_PARTITION3)).offset()));
            ((WorkerSinkTaskContext) this.sinkTaskContext.getValue()).offset(hashMap4);
            ((ConsumerRebalanceListener) this.rebalanceListener.getValue()).onPartitionsAssigned(asList);
            atomicBoolean2.set(true);
            ((Runnable) atomicReference.get()).run();
            TimestampType timestampType = TimestampType.NO_TIMESTAMP_TYPE;
            ArrayList arrayList2 = new ArrayList();
            arrayList2.add(new ConsumerRecord(TOPIC, 12, FIRST_OFFSET + this.recordsReturnedTp1 + 1, -1L, timestampType, 0, 0, RAW_KEY, RAW_VALUE, new RecordHeaders(), Optional.empty()));
            arrayList2.add(new ConsumerRecord(TOPIC, PARTITION3, FIRST_OFFSET + this.recordsReturnedTp3 + 1, -1L, timestampType, 0, 0, RAW_KEY, RAW_VALUE, new RecordHeaders(), Optional.empty()));
            this.recordsReturnedTp1++;
            this.recordsReturnedTp3++;
            return new ConsumerRecords(Collections.singletonMap(new TopicPartition(TOPIC, 12), arrayList2));
        };
        Mockito.when(this.sinkTask.preCommit(hashMap)).thenReturn(hashMap);
        long offset = ((OffsetAndMetadata) hashMap2.get(TOPIC_PARTITION)).offset();
        long offset2 = ((OffsetAndMetadata) hashMap2.get(TOPIC_PARTITION2)).offset();
        long offset3 = ((OffsetAndMetadata) hashMap2.get(TOPIC_PARTITION3)).offset();
        Mockito.when(this.sinkTask.preCommit(hashMap3)).thenReturn(hashMap3);
        Mockito.when(this.consumer.assignment()).thenReturn(INITIAL_ASSIGNMENT).thenReturn(INITIAL_ASSIGNMENT).thenReturn(INITIAL_ASSIGNMENT).thenReturn(INITIAL_ASSIGNMENT).thenReturn(INITIAL_ASSIGNMENT).thenReturn(new HashSet(asList)).thenReturn(new HashSet(asList)).thenReturn(new HashSet(asList)).thenReturn(new HashSet(asList)).thenReturn(new HashSet(asList));
        Mockito.when(Long.valueOf(this.consumer.position(TOPIC_PARTITION))).thenReturn(Long.valueOf(FIRST_OFFSET)).thenReturn(Long.valueOf(offset));
        Mockito.when(Long.valueOf(this.consumer.position(TOPIC_PARTITION2))).thenReturn(Long.valueOf(FIRST_OFFSET)).thenReturn(Long.valueOf(offset2));
        Mockito.when(Long.valueOf(this.consumer.position(TOPIC_PARTITION3))).thenReturn(Long.valueOf(offset3));
        Mockito.when(this.consumer.poll((Duration) ArgumentMatchers.any(Duration.class))).thenAnswer(answer).thenAnswer(expectConsumerPoll(1)).thenAnswer(answer2).thenAnswer(expectConsumerPoll(1));
        this.workerTask.iteration();
        this.time.sleep(ErrorHandlingTaskTest.OPERATOR_RETRY_MAX_DELAY_MILLIS);
        this.workerTask.iteration();
        ((WorkerSinkTaskContext) this.sinkTaskContext.getValue()).requestCommit();
        this.workerTask.iteration();
        assertSinkMetricValue("partition-count", 3.0d);
        assertSinkMetricValue("sink-record-read-total", 3.0d);
        assertSinkMetricValue("sink-record-send-total", 3.0d);
        assertSinkMetricValue("sink-record-active-count", 4.0d);
        assertSinkMetricValue("sink-record-active-count-max", 4.0d);
        assertSinkMetricValue("sink-record-active-count-avg", 0.71429d);
        assertSinkMetricValue("offset-commit-seq-no", 2.0d);
        assertSinkMetricValue("offset-commit-completion-total", 1.0d);
        assertSinkMetricValue("offset-commit-skip-total", 1.0d);
        assertTaskMetricValue("status", "running");
        assertTaskMetricValue("running-ratio", 1.0d);
        assertTaskMetricValue("pause-ratio", 0.0d);
        assertTaskMetricValue("batch-size-max", 2.0d);
        assertTaskMetricValue("batch-size-avg", 1.0d);
        assertTaskMetricValue("offset-commit-max-time-ms", 0.0d);
        assertTaskMetricValue("offset-commit-avg-time-ms", 0.0d);
        assertTaskMetricValue("offset-commit-failure-percentage", 0.0d);
        assertTaskMetricValue("offset-commit-success-percentage", 1.0d);
        Assert.assertTrue(atomicBoolean.get());
        Assert.assertTrue(atomicBoolean2.get());
        Assert.assertEquals(hashMap3, this.workerTask.currentOffsets());
        Assert.assertEquals(hashMap2, this.workerTask.lastCommittedOffsets());
        ((SinkTask) Mockito.verify(this.sinkTask)).close(new ArrayList(hashMap.keySet()));
        ((KafkaConsumer) Mockito.verify(this.consumer)).commitSync(ArgumentMatchers.anyMap());
        ((SinkTask) Mockito.verify(this.sinkTask)).open(asList);
        ((KafkaConsumer) Mockito.verify(this.consumer)).seek(TOPIC_PARTITION, offset);
        ((KafkaConsumer) Mockito.verify(this.consumer)).seek(TOPIC_PARTITION2, offset2);
        ((KafkaConsumer) Mockito.verify(this.consumer)).seek(TOPIC_PARTITION3, offset3);
        this.time.sleep(ErrorHandlingTaskTest.OPERATOR_RETRY_MAX_DELAY_MILLIS);
        ((WorkerSinkTaskContext) this.sinkTaskContext.getValue()).requestCommit();
        this.workerTask.iteration();
        ArgumentCaptor forClass = ArgumentCaptor.forClass(OffsetCommitCallback.class);
        ((KafkaConsumer) Mockito.verify(this.consumer)).commitAsync((Map) ArgumentMatchers.eq(hashMap3), (OffsetCommitCallback) forClass.capture());
        ((OffsetCommitCallback) forClass.getValue()).onComplete(hashMap3, (Exception) null);
        Assert.assertEquals(hashMap3, this.workerTask.currentOffsets());
        Assert.assertEquals(hashMap3, this.workerTask.lastCommittedOffsets());
        assertSinkMetricValue("partition-count", 3.0d);
        assertSinkMetricValue("sink-record-read-total", 4.0d);
        assertSinkMetricValue("sink-record-send-total", 4.0d);
        assertSinkMetricValue("sink-record-active-count", 0.0d);
        assertSinkMetricValue("sink-record-active-count-max", 4.0d);
        assertSinkMetricValue("sink-record-active-count-avg", 0.5555555d);
        assertSinkMetricValue("offset-commit-seq-no", 3.0d);
        assertSinkMetricValue("offset-commit-completion-total", 2.0d);
        assertSinkMetricValue("offset-commit-skip-total", 1.0d);
        assertTaskMetricValue("status", "running");
        assertTaskMetricValue("running-ratio", 1.0d);
        assertTaskMetricValue("pause-ratio", 0.0d);
        assertTaskMetricValue("batch-size-max", 2.0d);
        assertTaskMetricValue("batch-size-avg", 1.0d);
        assertTaskMetricValue("offset-commit-max-time-ms", 0.0d);
        assertTaskMetricValue("offset-commit-avg-time-ms", 0.0d);
        assertTaskMetricValue("offset-commit-failure-percentage", 0.0d);
        assertTaskMetricValue("offset-commit-success-percentage", 1.0d);
    }

    @Test
    public void testDeliveryWithMutatingTransform() {
        createTask(this.initialState);
        this.workerTask.initialize(TASK_CONFIG);
        this.workerTask.initializeAndStart();
        verifyInitializeTask();
        expectTaskGetTopic();
        expectPollInitialAssignment().thenAnswer(expectConsumerPoll(1)).thenAnswer(expectConsumerPoll(0));
        expectConversionAndTransformation("newtopic_", new RecordHeaders());
        this.workerTask.iteration();
        this.workerTask.iteration();
        HashMap hashMap = new HashMap();
        hashMap.put(TOPIC_PARTITION, new OffsetAndMetadata(46L));
        hashMap.put(TOPIC_PARTITION2, new OffsetAndMetadata(FIRST_OFFSET));
        Mockito.when(this.sinkTask.preCommit(hashMap)).thenReturn(hashMap);
        ((WorkerSinkTaskContext) this.sinkTaskContext.getValue()).requestCommit();
        Assert.assertTrue(((WorkerSinkTaskContext) this.sinkTaskContext.getValue()).isCommitRequested());
        Assert.assertNotEquals(hashMap, this.workerTask.lastCommittedOffsets());
        this.workerTask.iteration();
        ArgumentCaptor forClass = ArgumentCaptor.forClass(OffsetCommitCallback.class);
        ((KafkaConsumer) Mockito.verify(this.consumer)).commitAsync((Map) ArgumentMatchers.eq(hashMap), (OffsetCommitCallback) forClass.capture());
        ((OffsetCommitCallback) forClass.getValue()).onComplete(hashMap, (Exception) null);
        Assert.assertFalse(((WorkerSinkTaskContext) this.sinkTaskContext.getValue()).isCommitRequested());
        Assert.assertEquals(hashMap, this.workerTask.lastCommittedOffsets());
        Assert.assertEquals(0L, this.workerTask.commitFailures());
        Assert.assertEquals(1.0d, this.metrics.currentMetricValueAsDouble(this.workerTask.taskMetricsGroup().metricGroup(), "batch-size-max"), 1.0E-4d);
    }

    @Test
    public void testMissingTimestampPropagation() {
        createTask(this.initialState);
        expectTaskGetTopic();
        this.workerTask.initialize(TASK_CONFIG);
        this.workerTask.initializeAndStart();
        verifyInitializeTask();
        expectPollInitialAssignment().thenAnswer(expectConsumerPoll(1, -1L, TimestampType.CREATE_TIME, new RecordHeaders()));
        expectConversionAndTransformation(null, new RecordHeaders());
        this.workerTask.iteration();
        this.workerTask.iteration();
        ArgumentCaptor forClass = ArgumentCaptor.forClass(Collection.class);
        ((SinkTask) Mockito.verify(this.sinkTask, Mockito.times(2))).put((Collection) forClass.capture());
        SinkRecord sinkRecord = (SinkRecord) ((Collection) forClass.getValue()).iterator().next();
        Assert.assertNull(sinkRecord.timestamp());
        Assert.assertEquals(TimestampType.CREATE_TIME, sinkRecord.timestampType());
    }

    @Test
    public void testTimestampPropagation() {
        Long valueOf = Long.valueOf(System.currentTimeMillis());
        TimestampType timestampType = TimestampType.CREATE_TIME;
        createTask(this.initialState);
        expectTaskGetTopic();
        this.workerTask.initialize(TASK_CONFIG);
        this.workerTask.initializeAndStart();
        verifyInitializeTask();
        expectPollInitialAssignment().thenAnswer(expectConsumerPoll(1, valueOf.longValue(), timestampType, new RecordHeaders()));
        expectConversionAndTransformation(null, new RecordHeaders());
        this.workerTask.iteration();
        this.workerTask.iteration();
        ArgumentCaptor forClass = ArgumentCaptor.forClass(Collection.class);
        ((SinkTask) Mockito.verify(this.sinkTask, Mockito.times(2))).put((Collection) forClass.capture());
        SinkRecord sinkRecord = (SinkRecord) ((Collection) forClass.getValue()).iterator().next();
        Assert.assertEquals(valueOf, sinkRecord.timestamp());
        Assert.assertEquals(timestampType, sinkRecord.timestampType());
    }

    @Test
    public void testTopicsRegex() {
        HashMap hashMap = new HashMap(TASK_PROPS);
        hashMap.remove("topics");
        hashMap.put("topics.regex", "te.*");
        TaskConfig taskConfig = new TaskConfig(hashMap);
        createTask(TargetState.PAUSED);
        this.workerTask.initialize(taskConfig);
        this.workerTask.initializeAndStart();
        ArgumentCaptor forClass = ArgumentCaptor.forClass(Pattern.class);
        ((KafkaConsumer) Mockito.verify(this.consumer)).subscribe((Pattern) forClass.capture(), (ConsumerRebalanceListener) this.rebalanceListener.capture());
        Assert.assertEquals("te.*", ((Pattern) forClass.getValue()).pattern());
        ((SinkTask) Mockito.verify(this.sinkTask)).initialize((SinkTaskContext) this.sinkTaskContext.capture());
        ((SinkTask) Mockito.verify(this.sinkTask)).start(hashMap);
        expectPollInitialAssignment();
        this.workerTask.iteration();
        this.time.sleep(10000L);
        ((KafkaConsumer) Mockito.verify(this.consumer)).pause(INITIAL_ASSIGNMENT);
    }

    @Test
    public void testMetricsGroup() {
        WorkerSinkTask.SinkTaskMetricsGroup sinkTaskMetricsGroup = new WorkerSinkTask.SinkTaskMetricsGroup(this.taskId, this.metrics);
        WorkerSinkTask.SinkTaskMetricsGroup sinkTaskMetricsGroup2 = new WorkerSinkTask.SinkTaskMetricsGroup(this.taskId1, this.metrics);
        for (int i = 0; i != 10; i++) {
            sinkTaskMetricsGroup.recordRead(1);
            sinkTaskMetricsGroup.recordSend(2);
            sinkTaskMetricsGroup.recordPut(3L);
            sinkTaskMetricsGroup.recordPartitionCount(4);
            sinkTaskMetricsGroup.recordOffsetSequenceNumber(5);
        }
        HashMap hashMap = new HashMap();
        hashMap.put(TOPIC_PARTITION, new OffsetAndMetadata(46L));
        sinkTaskMetricsGroup.recordCommittedOffsets(hashMap);
        HashMap hashMap2 = new HashMap();
        hashMap2.put(TOPIC_PARTITION, new OffsetAndMetadata(55L));
        sinkTaskMetricsGroup.recordConsumedOffsets(hashMap2);
        for (int i2 = 0; i2 != 20; i2++) {
            sinkTaskMetricsGroup2.recordRead(1);
            sinkTaskMetricsGroup2.recordSend(2);
            sinkTaskMetricsGroup2.recordPut(30L);
            sinkTaskMetricsGroup2.recordPartitionCount(40);
            sinkTaskMetricsGroup2.recordOffsetSequenceNumber(50);
        }
        HashMap hashMap3 = new HashMap();
        hashMap3.put(TOPIC_PARTITION2, new OffsetAndMetadata(47L));
        hashMap3.put(TOPIC_PARTITION3, new OffsetAndMetadata(48L));
        sinkTaskMetricsGroup2.recordCommittedOffsets(hashMap3);
        HashMap hashMap4 = new HashMap();
        hashMap4.put(TOPIC_PARTITION2, new OffsetAndMetadata(65L));
        hashMap4.put(TOPIC_PARTITION3, new OffsetAndMetadata(75L));
        sinkTaskMetricsGroup2.recordConsumedOffsets(hashMap4);
        Assert.assertEquals(0.333d, this.metrics.currentMetricValueAsDouble(sinkTaskMetricsGroup.metricGroup(), "sink-record-read-rate"), 0.001d);
        Assert.assertEquals(0.667d, this.metrics.currentMetricValueAsDouble(sinkTaskMetricsGroup.metricGroup(), "sink-record-send-rate"), 0.001d);
        Assert.assertEquals(9.0d, this.metrics.currentMetricValueAsDouble(sinkTaskMetricsGroup.metricGroup(), "sink-record-active-count"), 0.001d);
        Assert.assertEquals(4.0d, this.metrics.currentMetricValueAsDouble(sinkTaskMetricsGroup.metricGroup(), "partition-count"), 0.001d);
        Assert.assertEquals(5.0d, this.metrics.currentMetricValueAsDouble(sinkTaskMetricsGroup.metricGroup(), "offset-commit-seq-no"), 0.001d);
        Assert.assertEquals(3.0d, this.metrics.currentMetricValueAsDouble(sinkTaskMetricsGroup.metricGroup(), "put-batch-max-time-ms"), 0.001d);
        sinkTaskMetricsGroup.close();
        Iterator it = sinkTaskMetricsGroup.metricGroup().metrics().metrics().keySet().iterator();
        while (it.hasNext()) {
            Assert.assertFalse(sinkTaskMetricsGroup.metricGroup().groupId().includes((MetricName) it.next()));
        }
        Assert.assertNull(sinkTaskMetricsGroup.metricGroup().metrics().getSensor("source-record-poll"));
        Assert.assertNull(sinkTaskMetricsGroup.metricGroup().metrics().getSensor("source-record-write"));
        Assert.assertNull(sinkTaskMetricsGroup.metricGroup().metrics().getSensor("poll-batch-time"));
        Assert.assertEquals(0.667d, this.metrics.currentMetricValueAsDouble(sinkTaskMetricsGroup2.metricGroup(), "sink-record-read-rate"), 0.001d);
        Assert.assertEquals(1.333d, this.metrics.currentMetricValueAsDouble(sinkTaskMetricsGroup2.metricGroup(), "sink-record-send-rate"), 0.001d);
        Assert.assertEquals(45.0d, this.metrics.currentMetricValueAsDouble(sinkTaskMetricsGroup2.metricGroup(), "sink-record-active-count"), 0.001d);
        Assert.assertEquals(40.0d, this.metrics.currentMetricValueAsDouble(sinkTaskMetricsGroup2.metricGroup(), "partition-count"), 0.001d);
        Assert.assertEquals(50.0d, this.metrics.currentMetricValueAsDouble(sinkTaskMetricsGroup2.metricGroup(), "offset-commit-seq-no"), 0.001d);
        Assert.assertEquals(30.0d, this.metrics.currentMetricValueAsDouble(sinkTaskMetricsGroup2.metricGroup(), "put-batch-max-time-ms"), 0.001d);
    }

    @Test
    public void testHeaders() {
        createTask(this.initialState);
        this.workerTask.initialize(TASK_CONFIG);
        this.workerTask.initializeAndStart();
        verifyInitializeTask();
        RecordHeaders recordHeaders = new RecordHeaders();
        recordHeaders.add("header_key", "header_value".getBytes());
        expectPollInitialAssignment().thenAnswer(expectConsumerPoll(1, recordHeaders));
        expectConversionAndTransformation(null, recordHeaders);
        this.workerTask.iteration();
        this.workerTask.iteration();
        ArgumentCaptor forClass = ArgumentCaptor.forClass(Collection.class);
        ((SinkTask) Mockito.verify(this.sinkTask, Mockito.times(2))).put((Collection) forClass.capture());
        Assert.assertEquals(1L, ((Collection) forClass.getValue()).size());
        Assert.assertEquals("header_value", ((SinkRecord) ((Collection) forClass.getValue()).iterator().next()).headers().lastWithName("header_key").value());
    }

    @Test
    public void testHeadersWithCustomConverter() {
        StringConverter stringConverter = new StringConverter();
        createTask(this.initialState, stringConverter, new SampleConverterWithHeaders(), stringConverter);
        this.workerTask.initialize(TASK_CONFIG);
        this.workerTask.initializeAndStart();
        verifyInitializeTask();
        String str = "a";
        String str2 = "Árvíztűrő tükörfúrógép";
        RecordHeaders recordHeaders = new RecordHeaders();
        String str3 = "latin2";
        recordHeaders.add("encoding", "latin2".getBytes());
        String str4 = "b";
        String str5 = "Тестовое сообщение";
        RecordHeaders recordHeaders2 = new RecordHeaders();
        String str6 = "koi8_r";
        recordHeaders2.add("encoding", "koi8_r".getBytes());
        expectPollInitialAssignment().thenAnswer(invocationOnMock -> {
            return new ConsumerRecords(Collections.singletonMap(new TopicPartition(TOPIC, 12), Arrays.asList(new ConsumerRecord(TOPIC, 12, FIRST_OFFSET + this.recordsReturnedTp1 + 1, -1L, TimestampType.NO_TIMESTAMP_TYPE, 0, 0, str.getBytes(), str2.getBytes(str3), recordHeaders, Optional.empty()), new ConsumerRecord(TOPIC, 12, FIRST_OFFSET + this.recordsReturnedTp1 + 2, -1L, TimestampType.NO_TIMESTAMP_TYPE, 0, 0, str4.getBytes(), str5.getBytes(str6), recordHeaders2, Optional.empty()))));
        });
        expectTransformation(null);
        this.workerTask.iteration();
        this.workerTask.iteration();
        ArgumentCaptor forClass = ArgumentCaptor.forClass(Collection.class);
        ((SinkTask) Mockito.verify(this.sinkTask, Mockito.times(2))).put((Collection) forClass.capture());
        Iterator it = ((Collection) forClass.getValue()).iterator();
        SinkRecord sinkRecord = (SinkRecord) it.next();
        Assert.assertEquals("a", sinkRecord.key());
        Assert.assertEquals("Árvíztűrő tükörfúrógép", sinkRecord.value());
        SinkRecord sinkRecord2 = (SinkRecord) it.next();
        Assert.assertEquals("b", sinkRecord2.key());
        Assert.assertEquals("Тестовое сообщение", sinkRecord2.value());
    }

    @Test
    public void testOriginalTopicWithTopicMutatingTransformations() {
        createTask(this.initialState);
        this.workerTask.initialize(TASK_CONFIG);
        this.workerTask.initializeAndStart();
        verifyInitializeTask();
        expectPollInitialAssignment().thenAnswer(expectConsumerPoll(1));
        expectConversionAndTransformation("newtopic_", new RecordHeaders());
        this.workerTask.iteration();
        this.workerTask.iteration();
        ArgumentCaptor forClass = ArgumentCaptor.forClass(Collection.class);
        ((SinkTask) Mockito.verify(this.sinkTask, Mockito.times(2))).put((Collection) forClass.capture());
        Assert.assertEquals(1L, ((Collection) forClass.getValue()).size());
        SinkRecord sinkRecord = (SinkRecord) ((Collection) forClass.getValue()).iterator().next();
        Assert.assertEquals(TOPIC, sinkRecord.originalTopic());
        Assert.assertEquals("newtopic_test", sinkRecord.topic());
    }

    @Test
    public void testPartitionCountInCaseOfPartitionRevocation() {
        MockConsumer mockConsumer = new MockConsumer(OffsetResetStrategy.EARLIEST);
        this.workerTask = new WorkerSinkTask(this.taskId, this.sinkTask, this.statusListener, TargetState.PAUSED, this.workerConfig, ClusterConfigState.EMPTY, this.metrics, this.keyConverter, this.valueConverter, this.errorHandlingMetrics, this.headerConverter, this.transformationChain, mockConsumer, this.pluginLoader, this.time, RetryWithToleranceOperatorTest.noopOperator(), (WorkerErrantRecordReporter) null, this.statusBackingStore, Collections::emptyList);
        mockConsumer.updateBeginningOffsets(new HashMap<TopicPartition, Long>() { // from class: org.apache.kafka.connect.runtime.WorkerSinkTaskTest.1
            {
                put(WorkerSinkTaskTest.TOPIC_PARTITION, 0L);
                put(WorkerSinkTaskTest.TOPIC_PARTITION2, 0L);
            }
        });
        this.workerTask.initialize(TASK_CONFIG);
        this.workerTask.initializeAndStart();
        mockConsumer.rebalance(INITIAL_ASSIGNMENT);
        assertSinkMetricValue("partition-count", 2.0d);
        mockConsumer.rebalance(Collections.singleton(TOPIC_PARTITION2));
        assertSinkMetricValue("partition-count", 1.0d);
        this.workerTask.close();
        assertSinkMetricValue("partition-count", 0.0d);
    }

    private void expectRebalanceRevocationError(RuntimeException runtimeException) {
        Mockito.when(this.sinkTask.preCommit(ArgumentMatchers.anyMap())).thenReturn(Collections.emptyMap());
        ((SinkTask) Mockito.doThrow(new Throwable[]{runtimeException}).when(this.sinkTask)).close(INITIAL_ASSIGNMENT);
    }

    private void expectRebalanceAssignmentError(RuntimeException runtimeException) {
        Mockito.when(this.sinkTask.preCommit(ArgumentMatchers.anyMap())).thenReturn(Collections.emptyMap());
        Mockito.when(Long.valueOf(this.consumer.position(TOPIC_PARTITION))).thenReturn(Long.valueOf(FIRST_OFFSET));
        Mockito.when(Long.valueOf(this.consumer.position(TOPIC_PARTITION2))).thenReturn(Long.valueOf(FIRST_OFFSET));
        ((SinkTask) Mockito.doThrow(new Throwable[]{runtimeException}).when(this.sinkTask)).open(INITIAL_ASSIGNMENT);
    }

    private void verifyInitializeTask() {
        ((KafkaConsumer) Mockito.verify(this.consumer)).subscribe((Collection) ArgumentMatchers.eq(Collections.singletonList(TOPIC)), (ConsumerRebalanceListener) this.rebalanceListener.capture());
        ((SinkTask) Mockito.verify(this.sinkTask)).initialize((SinkTaskContext) this.sinkTaskContext.capture());
        ((SinkTask) Mockito.verify(this.sinkTask)).start(TASK_PROPS);
    }

    private OngoingStubbing<ConsumerRecords<byte[], byte[]>> expectPollInitialAssignment() {
        Mockito.when(this.consumer.assignment()).thenReturn(INITIAL_ASSIGNMENT);
        INITIAL_ASSIGNMENT.forEach(topicPartition -> {
            Mockito.when(Long.valueOf(this.consumer.position(topicPartition))).thenReturn(Long.valueOf(FIRST_OFFSET));
        });
        return Mockito.when(this.consumer.poll((Duration) ArgumentMatchers.any(Duration.class))).thenAnswer(invocationOnMock -> {
            ((ConsumerRebalanceListener) this.rebalanceListener.getValue()).onPartitionsAssigned(INITIAL_ASSIGNMENT);
            return ConsumerRecords.empty();
        });
    }

    private void verifyPollInitialAssignment() {
        ((SinkTask) Mockito.verify(this.sinkTask)).open(INITIAL_ASSIGNMENT);
        ((KafkaConsumer) Mockito.verify(this.consumer, Mockito.atLeastOnce())).assignment();
        ((SinkTask) Mockito.verify(this.sinkTask)).put(Collections.emptyList());
    }

    private Answer<ConsumerRecords<byte[], byte[]>> expectConsumerPoll(int i) {
        return expectConsumerPoll(i, -1L, TimestampType.NO_TIMESTAMP_TYPE, new RecordHeaders());
    }

    private Answer<ConsumerRecords<byte[], byte[]>> expectConsumerPoll(int i, Headers headers) {
        return expectConsumerPoll(i, -1L, TimestampType.NO_TIMESTAMP_TYPE, headers);
    }

    private Answer<ConsumerRecords<byte[], byte[]>> expectConsumerPoll(int i, long j, TimestampType timestampType, Headers headers) {
        return invocationOnMock -> {
            ArrayList arrayList = new ArrayList();
            for (int i2 = 0; i2 < i; i2++) {
                arrayList.add(new ConsumerRecord(TOPIC, 12, FIRST_OFFSET + this.recordsReturnedTp1 + i2, j, timestampType, 0, 0, RAW_KEY, RAW_VALUE, headers, Optional.empty()));
            }
            this.recordsReturnedTp1 += i;
            return new ConsumerRecords(i > 0 ? Collections.singletonMap(new TopicPartition(TOPIC, 12), arrayList) : Collections.emptyMap());
        };
    }

    private void expectConversionAndTransformation(String str, Headers headers) {
        Mockito.when(this.keyConverter.toConnectData(TOPIC, headers, RAW_KEY)).thenReturn(new SchemaAndValue(KEY_SCHEMA, 12));
        Mockito.when(this.valueConverter.toConnectData(TOPIC, headers, RAW_VALUE)).thenReturn(new SchemaAndValue(VALUE_SCHEMA, VALUE));
        Iterator it = headers.iterator();
        while (it.hasNext()) {
            Header header = (Header) it.next();
            Mockito.when(this.headerConverter.toConnectHeader(TOPIC, header.key(), header.value())).thenReturn(new SchemaAndValue(VALUE_SCHEMA, new String(header.value())));
        }
        expectTransformation(str);
    }

    private void expectTransformation(String str) {
        Mockito.when(this.transformationChain.apply((ProcessingContext) ArgumentMatchers.any(ProcessingContext.class), (ConnectRecord) ArgumentMatchers.any(SinkRecord.class))).thenAnswer(invocationOnMock -> {
            SinkRecord sinkRecord = (SinkRecord) invocationOnMock.getArgument(1);
            return (str == null || str.isEmpty()) ? sinkRecord : sinkRecord.newRecord(str + sinkRecord.topic(), sinkRecord.kafkaPartition(), sinkRecord.keySchema(), sinkRecord.key(), sinkRecord.valueSchema(), sinkRecord.value(), sinkRecord.timestamp(), sinkRecord.headers());
        });
    }

    private void expectTaskGetTopic() {
        Mockito.when(this.statusBackingStore.getTopic(ArgumentMatchers.anyString(), ArgumentMatchers.anyString())).thenAnswer(invocationOnMock -> {
            return new TopicStatus((String) invocationOnMock.getArgument(1, String.class), new ConnectorTaskId((String) invocationOnMock.getArgument(0, String.class), 0), Time.SYSTEM.milliseconds());
        });
    }

    private void assertSinkMetricValue(String str, double d) {
        Assert.assertEquals(d, this.metrics.currentMetricValueAsDouble(this.workerTask.sinkTaskMetricsGroup().metricGroup(), str), 0.001d);
    }

    private void assertTaskMetricValue(String str, double d) {
        Assert.assertEquals(d, this.metrics.currentMetricValueAsDouble(this.workerTask.taskMetricsGroup().metricGroup(), str), 0.001d);
    }

    private void assertTaskMetricValue(String str, String str2) {
        Assert.assertEquals(str2, this.metrics.currentMetricValueAsString(this.workerTask.taskMetricsGroup().metricGroup(), str));
    }

    static {
        TASK_PROPS.put("topics", TOPIC);
        TASK_PROPS.put("task.class", SinkTask.class.getName());
        TASK_CONFIG = new TaskConfig(TASK_PROPS);
    }
}
