package org.apache.kafka.connect.runtime;

import java.time.Duration;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.InvalidTopicException;
import org.apache.kafka.common.errors.RecordTooLargeException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.integration.MonitorableSourceConnector;
import org.apache.kafka.connect.runtime.ConnectMetrics;
import org.apache.kafka.connect.runtime.TaskStatus;
import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperatorTest;
import org.apache.kafka.connect.runtime.isolation.Plugins;
import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;
import org.apache.kafka.connect.source.SourceTaskContext;
import org.apache.kafka.connect.storage.CloseableOffsetStorageReader;
import org.apache.kafka.connect.storage.ClusterConfigState;
import org.apache.kafka.connect.storage.ConnectorOffsetBackingStore;
import org.apache.kafka.connect.storage.Converter;
import org.apache.kafka.connect.storage.HeaderConverter;
import org.apache.kafka.connect.storage.OffsetStorageWriter;
import org.apache.kafka.connect.storage.StatusBackingStore;
import org.apache.kafka.connect.storage.StringConverter;
import org.apache.kafka.connect.util.ConnectorTaskId;
import org.apache.kafka.connect.util.ParameterizedTest;
import org.apache.kafka.connect.util.ThreadedTest;
import org.apache.kafka.connect.util.TopicAdmin;
import org.apache.kafka.connect.util.TopicCreationGroup;
import org.easymock.Capture;
import org.easymock.EasyMock;
import org.easymock.IAnswer;
import org.easymock.IExpectationSetters;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.powermock.api.easymock.PowerMock;
import org.powermock.api.easymock.annotation.Mock;
import org.powermock.api.easymock.annotation.MockStrict;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.modules.junit4.PowerMockRunner;
import org.powermock.modules.junit4.PowerMockRunnerDelegate;

@RunWith(PowerMockRunner.class)
@PowerMockRunnerDelegate(ParameterizedTest.class)
@PowerMockIgnore({"javax.management.*", "org.apache.log4j.*"})
/* loaded from: input_file:org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTaskTest.class */
public class ExactlyOnceWorkerSourceTaskTest extends ThreadedTest {
    private static final String TOPIC = "topic";
    private final ExecutorService executor = Executors.newSingleThreadExecutor();
    private final ConnectorTaskId taskId = new ConnectorTaskId("job", 0);
    private WorkerConfig config;
    private SourceConnectorConfig sourceConfig;
    private Plugins plugins;
    private MockConnectMetrics metrics;
    private Time time;
    private CountDownLatch pollLatch;

    @Mock
    private SourceTask sourceTask;

    @Mock
    private Converter keyConverter;

    @Mock
    private Converter valueConverter;

    @Mock
    private HeaderConverter headerConverter;

    @Mock
    private TransformationChain<SourceRecord> transformationChain;

    @Mock
    private KafkaProducer<byte[], byte[]> producer;

    @Mock
    private TopicAdmin admin;

    @Mock
    private CloseableOffsetStorageReader offsetReader;

    @Mock
    private OffsetStorageWriter offsetWriter;

    @Mock
    private ClusterConfigState clusterConfigState;
    private ExactlyOnceWorkerSourceTask workerTask;

    @Mock
    private Future<RecordMetadata> sendFuture;

    @MockStrict
    private TaskStatus.Listener statusListener;

    @Mock
    private StatusBackingStore statusBackingStore;

    @Mock
    private ConnectorOffsetBackingStore offsetStore;

    @Mock
    private Runnable preProducerCheck;

    @Mock
    private Runnable postProducerCheck;
    private Capture<Callback> producerCallbacks;
    private static final TaskConfig TASK_CONFIG;
    private static final SourceRecord SOURCE_RECORD;
    private static final List<SourceRecord> RECORDS;
    private final boolean enableTopicCreation;
    private static final Map<String, byte[]> PARTITION = Collections.singletonMap("key", "partition".getBytes());
    private static final Map<String, Integer> OFFSET = Collections.singletonMap("key", 12);
    private static final Schema KEY_SCHEMA = Schema.INT32_SCHEMA;
    private static final Integer KEY = -1;
    private static final Schema RECORD_SCHEMA = Schema.INT64_SCHEMA;
    private static final Long RECORD = 12L;
    private static final byte[] SERIALIZED_KEY = "converted-key".getBytes();
    private static final byte[] SERIALIZED_RECORD = "converted-record".getBytes();
    private static final Map<String, String> TASK_PROPS = new HashMap();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTaskTest$FlushOutcome.class */
    public enum FlushOutcome {
        SUCCEED,
        SUCCEED_ANY_TIMES,
        FAIL_FLUSH_CALLBACK,
        FAIL_TRANSACTION_COMMIT
    }

    /* JADX INFO: Access modifiers changed from: private */
    @FunctionalInterface
    /* loaded from: input_file:org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTaskTest$MockedMethodCall.class */
    public interface MockedMethodCall {
        void invoke() throws Exception;
    }

    /* loaded from: input_file:org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTaskTest$TestSourceTask.class */
    private static abstract class TestSourceTask extends SourceTask {
        private TestSourceTask() {
        }
    }

    @Parameterized.Parameters
    public static Collection<Boolean> parameters() {
        return Arrays.asList(false, true);
    }

    public ExactlyOnceWorkerSourceTaskTest(boolean z) {
        this.enableTopicCreation = z;
    }

    @Override // org.apache.kafka.connect.util.ThreadedTest
    public void setup() {
        super.setup();
        Map<String, String> workerProps = workerProps();
        this.plugins = new Plugins(workerProps);
        this.config = new StandaloneConfig(workerProps);
        this.sourceConfig = new SourceConnectorConfig(this.plugins, sourceConnectorProps(), true);
        this.producerCallbacks = EasyMock.newCapture();
        this.metrics = new MockConnectMetrics();
        this.time = Time.SYSTEM;
        EasyMock.expect(this.offsetStore.primaryOffsetsTopic()).andStubReturn("offsets-topic");
        this.pollLatch = new CountDownLatch(1);
    }

    private Map<String, String> workerProps() {
        HashMap hashMap = new HashMap();
        hashMap.put("key.converter", "org.apache.kafka.connect.json.JsonConverter");
        hashMap.put("value.converter", "org.apache.kafka.connect.json.JsonConverter");
        hashMap.put("internal.key.converter", "org.apache.kafka.connect.json.JsonConverter");
        hashMap.put("internal.value.converter", "org.apache.kafka.connect.json.JsonConverter");
        hashMap.put("internal.key.converter.schemas.enable", "false");
        hashMap.put("internal.value.converter.schemas.enable", "false");
        hashMap.put("offset.storage.file.filename", "/tmp/connect.offsets");
        hashMap.put("topic.creation.enable", String.valueOf(this.enableTopicCreation));
        return hashMap;
    }

    private Map<String, String> sourceConnectorProps() {
        return sourceConnectorProps(SourceTask.TransactionBoundary.DEFAULT);
    }

    private Map<String, String> sourceConnectorProps(SourceTask.TransactionBoundary transactionBoundary) {
        HashMap hashMap = new HashMap();
        hashMap.put("name", "foo-connector");
        hashMap.put("connector.class", MonitorableSourceConnector.class.getSimpleName());
        hashMap.put("tasks.max", String.valueOf(1));
        hashMap.put("topic", "topic");
        hashMap.put("key.converter", StringConverter.class.getName());
        hashMap.put("value.converter", StringConverter.class.getName());
        hashMap.put("topic.creation.groups", String.join(",", "foo", "bar"));
        hashMap.put("topic.creation.default.replication.factor", String.valueOf(1));
        hashMap.put("topic.creation.default.partitions", String.valueOf(1));
        hashMap.put("transaction.boundary", transactionBoundary.toString());
        hashMap.put("topic.creation.foo.include", "topic");
        hashMap.put("topic.creation.bar.include", ".*");
        hashMap.put("topic.creation.bar.exclude", "topic");
        return hashMap;
    }

    @After
    public void tearDown() {
        if (this.metrics != null) {
            this.metrics.stop();
        }
    }

    private void createWorkerTask() {
        createWorkerTask(TargetState.STARTED);
    }

    private void createWorkerTask(TargetState targetState) {
        createWorkerTask(targetState, this.keyConverter, this.valueConverter, this.headerConverter);
    }

    private void createWorkerTask(TargetState targetState, Converter converter, Converter converter2, HeaderConverter headerConverter) {
        this.workerTask = new ExactlyOnceWorkerSourceTask(this.taskId, this.sourceTask, this.statusListener, targetState, converter, converter2, headerConverter, this.transformationChain, this.producer, this.admin, TopicCreationGroup.configuredGroups(this.sourceConfig), this.offsetReader, this.offsetWriter, this.offsetStore, this.config, this.clusterConfigState, this.metrics, this.plugins.delegatingLoader(), this.time, RetryWithToleranceOperatorTest.NOOP_OPERATOR, this.statusBackingStore, this.sourceConfig, (v0) -> {
            v0.run();
        }, this.preProducerCheck, this.postProducerCheck);
    }

    @Test
    public void testStartPaused() throws Exception {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        createWorkerTask(TargetState.PAUSED);
        expectCall(() -> {
            this.statusListener.onPause(this.taskId);
        }).andAnswer(() -> {
            countDownLatch.countDown();
            return null;
        });
        EasyMock.expect(Boolean.valueOf(this.offsetWriter.willFlush())).andReturn(false);
        expectClose();
        expectCall(() -> {
            this.statusListener.onShutdown(this.taskId);
        });
        PowerMock.replayAll(new Object[0]);
        this.workerTask.initialize(TASK_CONFIG);
        Future<?> submit = this.executor.submit((Runnable) this.workerTask);
        Assert.assertTrue(countDownLatch.await(5L, TimeUnit.SECONDS));
        this.workerTask.stop();
        Assert.assertTrue(this.workerTask.awaitStop(1000L));
        submit.get();
        PowerMock.verifyAll();
    }

    @Test
    public void testPause() throws Exception {
        createWorkerTask();
        expectPreflight();
        expectStartup();
        AtomicInteger atomicInteger = new AtomicInteger(0);
        AtomicInteger atomicInteger2 = new AtomicInteger(0);
        this.pollLatch = new CountDownLatch(10);
        expectPolls(atomicInteger);
        expectAnyFlushes(atomicInteger2);
        expectTopicCreation("topic");
        expectCall(() -> {
            this.statusListener.onPause(this.taskId);
        });
        SourceTask sourceTask = this.sourceTask;
        sourceTask.getClass();
        expectCall(sourceTask::stop);
        expectCall(() -> {
            this.statusListener.onShutdown(this.taskId);
        });
        expectClose();
        PowerMock.replayAll(new Object[0]);
        this.workerTask.initialize(TASK_CONFIG);
        Future<?> submit = this.executor.submit((Runnable) this.workerTask);
        Assert.assertTrue(awaitLatch(this.pollLatch));
        this.workerTask.transitionTo(TargetState.PAUSED);
        int i = atomicInteger.get();
        Thread.sleep(100L);
        Assert.assertTrue(atomicInteger.get() - i <= 1);
        this.workerTask.stop();
        Assert.assertTrue(this.workerTask.awaitStop(1000L));
        submit.get();
        Assert.assertEquals("Task should have flushed offsets for every record poll, once on pause, and once for end-of-life offset commit", atomicInteger2.get(), atomicInteger.get() + 2);
        PowerMock.verifyAll();
    }

    @Test
    public void testFailureInPreProducerCheck() {
        createWorkerTask();
        ConnectException connectException = new ConnectException("Failed to perform zombie fencing");
        Runnable runnable = this.preProducerCheck;
        runnable.getClass();
        expectCall(runnable::run).andThrow(connectException);
        expectCall(() -> {
            this.statusListener.onFailure(this.taskId, connectException);
        });
        expectClose();
        PowerMock.replayAll(new Object[0]);
        this.workerTask.initialize(TASK_CONFIG);
        this.workerTask.run();
        PowerMock.verifyAll();
    }

    @Test
    public void testFailureInOffsetStoreStart() {
        createWorkerTask();
        Runnable runnable = this.preProducerCheck;
        runnable.getClass();
        expectCall(runnable::run);
        KafkaProducer<byte[], byte[]> kafkaProducer = this.producer;
        kafkaProducer.getClass();
        expectCall(kafkaProducer::initTransactions);
        Runnable runnable2 = this.postProducerCheck;
        runnable2.getClass();
        expectCall(runnable2::run);
        ConnectException connectException = new ConnectException("No soup for you!");
        ConnectorOffsetBackingStore connectorOffsetBackingStore = this.offsetStore;
        connectorOffsetBackingStore.getClass();
        expectCall(connectorOffsetBackingStore::start).andThrow(connectException);
        expectCall(() -> {
            this.statusListener.onFailure(this.taskId, connectException);
        });
        expectClose();
        PowerMock.replayAll(new Object[0]);
        this.workerTask.initialize(TASK_CONFIG);
        this.workerTask.run();
        PowerMock.verifyAll();
    }

    @Test
    public void testFailureInProducerInitialization() {
        createWorkerTask();
        Runnable runnable = this.preProducerCheck;
        runnable.getClass();
        expectCall(runnable::run);
        KafkaProducer<byte[], byte[]> kafkaProducer = this.producer;
        kafkaProducer.getClass();
        expectCall(kafkaProducer::initTransactions);
        ConnectException connectException = new ConnectException("You can't do that!");
        Runnable runnable2 = this.postProducerCheck;
        runnable2.getClass();
        expectCall(runnable2::run).andThrow(connectException);
        expectCall(() -> {
            this.statusListener.onFailure(this.taskId, connectException);
        });
        expectClose();
        PowerMock.replayAll(new Object[0]);
        this.workerTask.initialize(TASK_CONFIG);
        this.workerTask.run();
        PowerMock.verifyAll();
    }

    @Test
    public void testFailureInPostProducerCheck() {
        createWorkerTask();
        Runnable runnable = this.preProducerCheck;
        runnable.getClass();
        expectCall(runnable::run);
        ConnectException connectException = new ConnectException("New task configs for the connector have already been generated");
        KafkaProducer<byte[], byte[]> kafkaProducer = this.producer;
        kafkaProducer.getClass();
        expectCall(kafkaProducer::initTransactions).andThrow(connectException);
        expectCall(() -> {
            this.statusListener.onFailure(this.taskId, connectException);
        });
        expectClose();
        PowerMock.replayAll(new Object[0]);
        this.workerTask.initialize(TASK_CONFIG);
        this.workerTask.run();
        PowerMock.verifyAll();
    }

    @Test
    public void testPollsInBackground() throws Exception {
        createWorkerTask();
        expectPreflight();
        expectStartup();
        AtomicInteger atomicInteger = new AtomicInteger(0);
        AtomicInteger atomicInteger2 = new AtomicInteger(0);
        this.pollLatch = new CountDownLatch(10);
        expectPolls(atomicInteger);
        expectAnyFlushes(atomicInteger2);
        expectTopicCreation("topic");
        SourceTask sourceTask = this.sourceTask;
        sourceTask.getClass();
        expectCall(sourceTask::stop);
        expectCall(() -> {
            this.statusListener.onShutdown(this.taskId);
        });
        expectClose();
        PowerMock.replayAll(new Object[0]);
        this.workerTask.initialize(TASK_CONFIG);
        Future<?> submit = this.executor.submit((Runnable) this.workerTask);
        Assert.assertTrue(awaitLatch(this.pollLatch));
        this.workerTask.stop();
        Assert.assertTrue(this.workerTask.awaitStop(1000L));
        submit.get();
        assertPollMetrics(10);
        assertTransactionMetrics(1);
        Assert.assertEquals("Task should have flushed offsets for every record poll and for end-of-life offset commit", atomicInteger2.get(), atomicInteger.get() + 1);
        PowerMock.verifyAll();
    }

    @Test
    public void testFailureInPoll() throws Exception {
        createWorkerTask();
        expectPreflight();
        expectStartup();
        CountDownLatch countDownLatch = new CountDownLatch(1);
        RuntimeException runtimeException = new RuntimeException();
        EasyMock.expect(this.sourceTask.poll()).andAnswer(() -> {
            countDownLatch.countDown();
            throw runtimeException;
        });
        expectCall(() -> {
            this.statusListener.onFailure(this.taskId, runtimeException);
        });
        SourceTask sourceTask = this.sourceTask;
        sourceTask.getClass();
        expectCall(sourceTask::stop);
        expectClose();
        PowerMock.replayAll(new Object[0]);
        this.workerTask.initialize(TASK_CONFIG);
        Future<?> submit = this.executor.submit((Runnable) this.workerTask);
        Assert.assertTrue(awaitLatch(countDownLatch));
        Assert.assertTrue(this.workerTask.awaitStop(1000L));
        submit.get();
        assertPollMetrics(0);
        PowerMock.verifyAll();
    }

    @Test
    public void testFailureInPollAfterCancel() throws Exception {
        createWorkerTask();
        expectPreflight();
        expectStartup();
        CountDownLatch countDownLatch = new CountDownLatch(1);
        CountDownLatch countDownLatch2 = new CountDownLatch(1);
        RuntimeException runtimeException = new RuntimeException();
        EasyMock.expect(this.sourceTask.poll()).andAnswer(() -> {
            countDownLatch.countDown();
            Assert.assertTrue(awaitLatch(countDownLatch2));
            throw runtimeException;
        });
        CloseableOffsetStorageReader closeableOffsetStorageReader = this.offsetReader;
        closeableOffsetStorageReader.getClass();
        expectCall(closeableOffsetStorageReader::close);
        expectCall(() -> {
            this.producer.close(Duration.ZERO);
        });
        SourceTask sourceTask = this.sourceTask;
        sourceTask.getClass();
        expectCall(sourceTask::stop);
        expectClose();
        PowerMock.replayAll(new Object[0]);
        this.workerTask.initialize(TASK_CONFIG);
        Future<?> submit = this.executor.submit((Runnable) this.workerTask);
        Assert.assertTrue(awaitLatch(countDownLatch));
        this.workerTask.cancel();
        countDownLatch2.countDown();
        Assert.assertTrue(this.workerTask.awaitStop(1000L));
        submit.get();
        assertPollMetrics(0);
        PowerMock.verifyAll();
    }

    @Test
    public void testFailureInPollAfterStop() throws Exception {
        createWorkerTask();
        expectPreflight();
        expectStartup();
        CountDownLatch countDownLatch = new CountDownLatch(1);
        CountDownLatch countDownLatch2 = new CountDownLatch(1);
        RuntimeException runtimeException = new RuntimeException();
        EasyMock.expect(this.sourceTask.poll()).andAnswer(() -> {
            countDownLatch.countDown();
            Assert.assertTrue(awaitLatch(countDownLatch2));
            throw runtimeException;
        });
        expectCall(() -> {
            this.statusListener.onShutdown(this.taskId);
        });
        SourceTask sourceTask = this.sourceTask;
        sourceTask.getClass();
        expectCall(sourceTask::stop);
        expectClose();
        PowerMock.replayAll(new Object[0]);
        this.workerTask.initialize(TASK_CONFIG);
        Future<?> submit = this.executor.submit((Runnable) this.workerTask);
        Assert.assertTrue(awaitLatch(countDownLatch));
        this.workerTask.stop();
        countDownLatch2.countDown();
        Assert.assertTrue(this.workerTask.awaitStop(1000L));
        submit.get();
        assertPollMetrics(0);
        PowerMock.verifyAll();
    }

    @Test
    public void testPollReturnsNoRecords() throws Exception {
        createWorkerTask();
        expectPreflight();
        expectStartup();
        CountDownLatch expectEmptyPolls = expectEmptyPolls(1, new AtomicInteger());
        EasyMock.expect(Boolean.valueOf(this.offsetWriter.willFlush())).andReturn(false).anyTimes();
        SourceTask sourceTask = this.sourceTask;
        sourceTask.getClass();
        expectCall(sourceTask::stop);
        expectCall(() -> {
            this.statusListener.onShutdown(this.taskId);
        });
        expectClose();
        PowerMock.replayAll(new Object[0]);
        this.workerTask.initialize(TASK_CONFIG);
        Future<?> submit = this.executor.submit((Runnable) this.workerTask);
        Assert.assertTrue(awaitLatch(expectEmptyPolls));
        this.workerTask.stop();
        Assert.assertTrue(this.workerTask.awaitStop(1000L));
        submit.get();
        assertPollMetrics(0);
        PowerMock.verifyAll();
    }

    @Test
    public void testPollBasedCommit() throws Exception {
        this.sourceConfig = new SourceConnectorConfig(this.plugins, sourceConnectorProps(SourceTask.TransactionBoundary.POLL), this.enableTopicCreation);
        createWorkerTask();
        expectPreflight();
        expectStartup();
        AtomicInteger atomicInteger = new AtomicInteger();
        AtomicInteger atomicInteger2 = new AtomicInteger();
        expectPolls(atomicInteger);
        expectAnyFlushes(atomicInteger2);
        expectTopicCreation("topic");
        SourceTask sourceTask = this.sourceTask;
        sourceTask.getClass();
        expectCall(sourceTask::stop);
        expectCall(() -> {
            this.statusListener.onShutdown(this.taskId);
        });
        expectClose();
        PowerMock.replayAll(new Object[0]);
        this.workerTask.initialize(TASK_CONFIG);
        Future<?> submit = this.executor.submit((Runnable) this.workerTask);
        Assert.assertTrue(awaitLatch(this.pollLatch));
        this.workerTask.stop();
        Assert.assertTrue(this.workerTask.awaitStop(1000L));
        submit.get();
        Assert.assertEquals("Task should have flushed offsets for every record poll, and for end-of-life offset commit", atomicInteger2.get(), atomicInteger.get() + 1);
        assertPollMetrics(1);
        assertTransactionMetrics(1);
        PowerMock.verifyAll();
    }

    @Test
    public void testIntervalBasedCommit() throws Exception {
        Map<String, String> sourceConnectorProps = sourceConnectorProps(SourceTask.TransactionBoundary.INTERVAL);
        sourceConnectorProps.put("transaction.boundary.interval.ms", Long.toString(618L));
        this.sourceConfig = new SourceConnectorConfig(this.plugins, sourceConnectorProps, this.enableTopicCreation);
        this.time = new MockTime();
        createWorkerTask();
        expectPreflight();
        expectStartup();
        expectPolls();
        CountDownLatch countDownLatch = new CountDownLatch(2);
        CountDownLatch countDownLatch2 = new CountDownLatch(2);
        CountDownLatch countDownLatch3 = new CountDownLatch(2);
        AtomicInteger atomicInteger = new AtomicInteger();
        expectFlush(FlushOutcome.SUCCEED, atomicInteger);
        expectFlush(FlushOutcome.SUCCEED, atomicInteger);
        expectFlush(FlushOutcome.SUCCEED, atomicInteger);
        expectTopicCreation("topic");
        SourceTask sourceTask = this.sourceTask;
        sourceTask.getClass();
        expectCall(sourceTask::stop);
        expectCall(() -> {
            this.statusListener.onShutdown(this.taskId);
        });
        expectClose();
        PowerMock.replayAll(new Object[0]);
        this.workerTask.initialize(TASK_CONFIG);
        Future<?> submit = this.executor.submit((Runnable) this.workerTask);
        this.pollLatch = countDownLatch;
        Assert.assertTrue(awaitLatch(this.pollLatch));
        Assert.assertEquals("No flushes should have taken place before offset commit interval has elapsed", 0L, atomicInteger.get());
        this.time.sleep(618L);
        this.pollLatch = countDownLatch2;
        Assert.assertTrue(awaitLatch(this.pollLatch));
        Assert.assertEquals("One flush should have taken place after offset commit interval has elapsed", 1L, atomicInteger.get());
        this.time.sleep(618 * 2);
        this.pollLatch = countDownLatch3;
        Assert.assertTrue(awaitLatch(this.pollLatch));
        Assert.assertEquals("Two flushes should have taken place after offset commit interval has elapsed again", 2L, atomicInteger.get());
        this.workerTask.stop();
        Assert.assertTrue(this.workerTask.awaitStop(1000L));
        submit.get();
        Assert.assertEquals("Task should have flushed offsets twice based on offset commit interval, and performed final end-of-life offset commit", 3L, atomicInteger.get());
        assertPollMetrics(2);
        PowerMock.verifyAll();
    }

    @Test
    public void testConnectorBasedCommit() throws Exception {
        this.sourceConfig = new SourceConnectorConfig(this.plugins, sourceConnectorProps(SourceTask.TransactionBoundary.CONNECTOR), this.enableTopicCreation);
        createWorkerTask();
        expectPreflight();
        expectStartup();
        expectPolls();
        List list = (List) IntStream.range(0, 7).mapToObj(i -> {
            return new CountDownLatch(3);
        }).collect(Collectors.toList());
        AtomicInteger atomicInteger = new AtomicInteger();
        expectFlush(FlushOutcome.SUCCEED, atomicInteger);
        expectFlush(FlushOutcome.SUCCEED, atomicInteger);
        KafkaProducer<byte[], byte[]> kafkaProducer = this.producer;
        kafkaProducer.getClass();
        expectCall(kafkaProducer::abortTransaction);
        EasyMock.expect(Boolean.valueOf(this.offsetWriter.willFlush())).andReturn(true);
        expectFlush(FlushOutcome.SUCCEED, atomicInteger);
        EasyMock.expect(Boolean.valueOf(this.offsetWriter.willFlush())).andReturn(true);
        KafkaProducer<byte[], byte[]> kafkaProducer2 = this.producer;
        kafkaProducer2.getClass();
        expectCall(kafkaProducer2::abortTransaction);
        expectFlush(FlushOutcome.SUCCEED, atomicInteger);
        expectTopicCreation("topic");
        SourceTask sourceTask = this.sourceTask;
        sourceTask.getClass();
        expectCall(sourceTask::stop);
        expectCall(() -> {
            this.statusListener.onShutdown(this.taskId);
        });
        expectClose();
        PowerMock.replayAll(new Object[0]);
        this.workerTask.initialize(TASK_CONFIG);
        Future<?> submit = this.executor.submit((Runnable) this.workerTask);
        WorkerTransactionContext transactionContext = this.workerTask.sourceTaskContext.transactionContext();
        int i2 = (-1) + 1;
        this.pollLatch = (CountDownLatch) list.get(i2);
        Assert.assertTrue(awaitLatch(this.pollLatch));
        Assert.assertEquals("No flushes should have taken place without connector requesting transaction commit", 0L, atomicInteger.get());
        transactionContext.commitTransaction();
        int i3 = i2 + 1;
        this.pollLatch = (CountDownLatch) list.get(i3);
        Assert.assertTrue(awaitLatch(this.pollLatch));
        Assert.assertEquals("One flush should have taken place after connector requested batch commit", 1L, atomicInteger.get());
        transactionContext.commitTransaction(SOURCE_RECORD);
        int i4 = i3 + 1;
        this.pollLatch = (CountDownLatch) list.get(i4);
        Assert.assertTrue(awaitLatch(this.pollLatch));
        Assert.assertEquals("Two flushes should have taken place after connector requested individual record commit", 2L, atomicInteger.get());
        int i5 = i4 + 1;
        this.pollLatch = (CountDownLatch) list.get(i5);
        Assert.assertTrue(awaitLatch(this.pollLatch));
        Assert.assertEquals("Only two flushes should still have taken place without connector re-requesting commit, even on identical records", 2L, atomicInteger.get());
        transactionContext.abortTransaction();
        int i6 = i5 + 1;
        this.pollLatch = (CountDownLatch) list.get(i6);
        Assert.assertTrue(awaitLatch(this.pollLatch));
        Assert.assertEquals("Three flushes should have taken place after connector requested batch abort", 3L, atomicInteger.get());
        transactionContext.abortTransaction(SOURCE_RECORD);
        int i7 = i6 + 1;
        this.pollLatch = (CountDownLatch) list.get(i7);
        Assert.assertTrue(awaitLatch(this.pollLatch));
        Assert.assertEquals("Four flushes should have taken place after connector requested individual record abort", 4L, atomicInteger.get());
        this.pollLatch = (CountDownLatch) list.get(i7 + 1);
        Assert.assertTrue(awaitLatch(this.pollLatch));
        Assert.assertEquals("Only four flushes should still have taken place without connector re-requesting abort, even on identical records", 4L, atomicInteger.get());
        this.workerTask.stop();
        Assert.assertTrue(this.workerTask.awaitStop(1000L));
        submit.get();
        Assert.assertEquals("Task should have flushed offsets four times based on connector-defined boundaries, and skipped final end-of-life offset commit", 4L, atomicInteger.get());
        assertPollMetrics(1);
        assertTransactionMetrics(2);
        PowerMock.verifyAll();
    }

    @Test
    public void testCommitFlushCallbackFailure() throws Exception {
        testCommitFailure(FlushOutcome.FAIL_FLUSH_CALLBACK);
    }

    @Test
    public void testCommitTransactionFailure() throws Exception {
        testCommitFailure(FlushOutcome.FAIL_TRANSACTION_COMMIT);
    }

    private void testCommitFailure(FlushOutcome flushOutcome) throws Exception {
        createWorkerTask();
        expectPreflight();
        expectStartup();
        expectPolls();
        expectFlush(flushOutcome);
        expectTopicCreation("topic");
        SourceTask sourceTask = this.sourceTask;
        sourceTask.getClass();
        expectCall(sourceTask::stop);
        CountDownLatch countDownLatch = new CountDownLatch(1);
        expectCall(() -> {
            this.statusListener.onFailure((ConnectorTaskId) EasyMock.eq(this.taskId), (Throwable) EasyMock.anyObject());
        }).andAnswer(() -> {
            countDownLatch.countDown();
            return null;
        });
        expectClose();
        PowerMock.replayAll(new Object[0]);
        this.workerTask.initialize(TASK_CONFIG);
        Future<?> submit = this.executor.submit((Runnable) this.workerTask);
        Assert.assertTrue(awaitLatch(countDownLatch));
        this.workerTask.stop();
        Assert.assertTrue(this.workerTask.awaitStop(1000L));
        submit.get();
        assertPollMetrics(1);
        PowerMock.verifyAll();
    }

    @Test
    public void testSendRecordsRetries() throws Exception {
        createWorkerTask();
        SourceRecord sourceRecord = new SourceRecord(PARTITION, OFFSET, "topic", 1, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
        SourceRecord sourceRecord2 = new SourceRecord(PARTITION, OFFSET, "topic", 2, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
        SourceRecord sourceRecord3 = new SourceRecord(PARTITION, OFFSET, "topic", 3, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
        expectTopicCreation("topic");
        expectSendRecordOnce(false);
        KafkaProducer<byte[], byte[]> kafkaProducer = this.producer;
        kafkaProducer.getClass();
        expectCall(kafkaProducer::beginTransaction);
        expectSendRecordSyncFailure(new TimeoutException("retriable sync failure"));
        expectSendRecordOnce(true);
        expectSendRecordOnce(false);
        PowerMock.replayAll(new Object[0]);
        this.workerTask.toSend = Arrays.asList(sourceRecord, sourceRecord2, sourceRecord3);
        this.workerTask.sendRecords();
        Assert.assertEquals(Arrays.asList(sourceRecord2, sourceRecord3), this.workerTask.toSend);
        this.workerTask.sendRecords();
        Assert.assertNull(this.workerTask.toSend);
        PowerMock.verifyAll();
    }

    @Test
    public void testSendRecordsProducerSendFailsImmediately() {
        if (this.enableTopicCreation) {
            createWorkerTask();
            SourceRecord sourceRecord = new SourceRecord(PARTITION, OFFSET, "topic", 1, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
            SourceRecord sourceRecord2 = new SourceRecord(PARTITION, OFFSET, "topic", 2, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
            KafkaProducer<byte[], byte[]> kafkaProducer = this.producer;
            kafkaProducer.getClass();
            expectCall(kafkaProducer::beginTransaction);
            expectTopicCreation("topic");
            expectConvertHeadersAndKeyValue("topic", true, emptyHeaders());
            expectApplyTransformationChain(false);
            EasyMock.expect(this.producer.send((ProducerRecord) EasyMock.anyObject(), (Callback) EasyMock.anyObject())).andThrow(new KafkaException("Producer closed while send in progress", new InvalidTopicException("topic")));
            PowerMock.replayAll(new Object[0]);
            this.workerTask.toSend = Arrays.asList(sourceRecord, sourceRecord2);
            ExactlyOnceWorkerSourceTask exactlyOnceWorkerSourceTask = this.workerTask;
            exactlyOnceWorkerSourceTask.getClass();
            Assert.assertThrows(ConnectException.class, exactlyOnceWorkerSourceTask::sendRecords);
            PowerMock.verifyAll();
        }
    }

    @Test
    public void testSlowTaskStart() throws Exception {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        CountDownLatch countDownLatch2 = new CountDownLatch(1);
        createWorkerTask();
        expectPreflight();
        expectCall(() -> {
            this.sourceTask.initialize((SourceTaskContext) EasyMock.anyObject(SourceTaskContext.class));
        });
        expectCall(() -> {
            this.sourceTask.start(TASK_PROPS);
        });
        EasyMock.expectLastCall().andAnswer(() -> {
            countDownLatch.countDown();
            Assert.assertTrue(awaitLatch(countDownLatch2));
            return null;
        });
        expectCall(() -> {
            this.statusListener.onStartup(this.taskId);
        });
        SourceTask sourceTask = this.sourceTask;
        sourceTask.getClass();
        expectCall(sourceTask::stop);
        EasyMock.expect(Boolean.valueOf(this.offsetWriter.willFlush())).andReturn(false);
        expectCall(() -> {
            this.statusListener.onShutdown(this.taskId);
        });
        expectClose();
        PowerMock.replayAll(new Object[0]);
        this.workerTask.initialize(TASK_CONFIG);
        Future<?> submit = this.executor.submit((Runnable) this.workerTask);
        Assert.assertTrue(awaitLatch(countDownLatch));
        this.workerTask.stop();
        countDownLatch2.countDown();
        Assert.assertTrue(this.workerTask.awaitStop(1000L));
        submit.get();
        PowerMock.verifyAll();
    }

    @Test
    public void testCancel() {
        createWorkerTask();
        CloseableOffsetStorageReader closeableOffsetStorageReader = this.offsetReader;
        closeableOffsetStorageReader.getClass();
        expectCall(closeableOffsetStorageReader::close);
        expectCall(() -> {
            this.producer.close(Duration.ZERO);
        });
        PowerMock.replayAll(new Object[0]);
        this.workerTask.cancel();
        PowerMock.verifyAll();
    }

    private TopicAdmin.TopicCreationResponse createdTopic(String str) {
        return new TopicAdmin.TopicCreationResponse(Collections.singleton(str), Collections.emptySet());
    }

    private CountDownLatch expectEmptyPolls(int i, AtomicInteger atomicInteger) throws InterruptedException {
        CountDownLatch countDownLatch = new CountDownLatch(i);
        EasyMock.expect(this.sourceTask.poll()).andStubAnswer(() -> {
            atomicInteger.incrementAndGet();
            countDownLatch.countDown();
            Thread.sleep(10L);
            return Collections.emptyList();
        });
        return countDownLatch;
    }

    private void expectPolls(AtomicInteger atomicInteger) throws Exception {
        KafkaProducer<byte[], byte[]> kafkaProducer = this.producer;
        kafkaProducer.getClass();
        expectCall(kafkaProducer::beginTransaction).atLeastOnce();
        EasyMock.expect(this.sourceTask.poll()).andStubAnswer(() -> {
            atomicInteger.incrementAndGet();
            this.pollLatch.countDown();
            Thread.sleep(10L);
            return RECORDS;
        });
        expectSendRecordAnyTimes();
    }

    private void expectPolls() throws Exception {
        expectPolls(new AtomicInteger());
    }

    private void expectSendRecordSyncFailure(Throwable th) {
        expectConvertHeadersAndKeyValue(false);
        expectApplyTransformationChain(false);
        this.offsetWriter.offset(PARTITION, OFFSET);
        PowerMock.expectLastCall();
        EasyMock.expect(this.producer.send((ProducerRecord) EasyMock.anyObject(ProducerRecord.class), (Callback) EasyMock.anyObject(Callback.class))).andThrow(th);
    }

    private Capture<ProducerRecord<byte[], byte[]>> expectSendRecordAnyTimes() {
        return expectSendRecordSendSuccess(true, false);
    }

    private Capture<ProducerRecord<byte[], byte[]>> expectSendRecordOnce(boolean z) {
        return expectSendRecordSendSuccess(false, z);
    }

    private Capture<ProducerRecord<byte[], byte[]>> expectSendRecordSendSuccess(boolean z, boolean z2) {
        return expectSendRecord("topic", z, z2, true, true, emptyHeaders());
    }

    private Capture<ProducerRecord<byte[], byte[]>> expectSendRecord(String str, boolean z, boolean z2, boolean z3, boolean z4, Headers headers) {
        if (z4) {
            expectConvertHeadersAndKeyValue(str, z, headers);
        }
        expectApplyTransformationChain(z);
        Capture<ProducerRecord<byte[], byte[]>> newCapture = EasyMock.newCapture();
        if (!z2) {
            this.offsetWriter.offset(PARTITION, OFFSET);
            if (z) {
                PowerMock.expectLastCall().anyTimes();
            } else {
                PowerMock.expectLastCall();
            }
        }
        IExpectationSetters expect = EasyMock.expect(this.producer.send((ProducerRecord) EasyMock.capture(newCapture), (Callback) EasyMock.capture(this.producerCallbacks)));
        IAnswer iAnswer = () -> {
            synchronized (this.producerCallbacks) {
                for (Callback callback : this.producerCallbacks.getValues()) {
                    if (z3) {
                        callback.onCompletion(new RecordMetadata(new TopicPartition("foo", 0), 0L, 0, 0L, 0, 0), (Exception) null);
                    } else {
                        callback.onCompletion((RecordMetadata) null, new TopicAuthorizationException("foo"));
                    }
                }
                this.producerCallbacks.reset();
            }
            return this.sendFuture;
        };
        if (z) {
            expect.andStubAnswer(iAnswer);
        } else {
            expect.andAnswer(iAnswer);
        }
        if (z3) {
            expectTaskGetTopic(z);
        }
        return newCapture;
    }

    private void expectConvertHeadersAndKeyValue(boolean z) {
        expectConvertHeadersAndKeyValue("topic", z, emptyHeaders());
    }

    private void expectConvertHeadersAndKeyValue(String str, boolean z, Headers headers) {
        Iterator it = headers.iterator();
        while (it.hasNext()) {
            Header header = (Header) it.next();
            IExpectationSetters expect = EasyMock.expect(this.headerConverter.fromConnectHeader(str, header.key(), Schema.STRING_SCHEMA, new String(header.value())));
            if (z) {
                expect.andStubReturn(header.value());
            } else {
                expect.andReturn(header.value());
            }
        }
        IExpectationSetters expect2 = EasyMock.expect(this.keyConverter.fromConnectData(str, headers, KEY_SCHEMA, KEY));
        if (z) {
            expect2.andStubReturn(SERIALIZED_KEY);
        } else {
            expect2.andReturn(SERIALIZED_KEY);
        }
        IExpectationSetters expect3 = EasyMock.expect(this.valueConverter.fromConnectData(str, headers, RECORD_SCHEMA, RECORD));
        if (z) {
            expect3.andStubReturn(SERIALIZED_RECORD);
        } else {
            expect3.andReturn(SERIALIZED_RECORD);
        }
    }

    private void expectApplyTransformationChain(boolean z) {
        Capture newCapture = EasyMock.newCapture();
        IExpectationSetters expect = EasyMock.expect(this.transformationChain.apply((ConnectRecord) EasyMock.capture(newCapture)));
        if (z) {
            newCapture.getClass();
            expect.andStubAnswer(newCapture::getValue);
        } else {
            newCapture.getClass();
            expect.andAnswer(newCapture::getValue);
        }
    }

    private void expectTaskGetTopic(boolean z) {
        Capture newCapture = EasyMock.newCapture();
        Capture newCapture2 = EasyMock.newCapture();
        IExpectationSetters expect = EasyMock.expect(this.statusBackingStore.getTopic((String) EasyMock.capture(newCapture), (String) EasyMock.capture(newCapture2)));
        if (z) {
            expect.andStubAnswer(() -> {
                return new TopicStatus((String) newCapture2.getValue(), new ConnectorTaskId((String) newCapture.getValue(), 0), this.time.milliseconds());
            });
        } else {
            expect.andAnswer(() -> {
                return new TopicStatus((String) newCapture2.getValue(), new ConnectorTaskId((String) newCapture.getValue(), 0), this.time.milliseconds());
            });
        }
        if (newCapture.hasCaptured() && newCapture2.hasCaptured()) {
            Assert.assertEquals("job", newCapture.getValue());
            Assert.assertEquals("topic", newCapture2.getValue());
        }
    }

    private boolean awaitLatch(CountDownLatch countDownLatch) {
        try {
            return countDownLatch.await(ErrorHandlingTaskTest.OPERATOR_RETRY_MAX_DELAY_MILLIS, TimeUnit.MILLISECONDS);
        } catch (InterruptedException e) {
            return false;
        }
    }

    private CountDownLatch expectFlush(FlushOutcome flushOutcome, AtomicInteger atomicInteger) {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        IExpectationSetters andAnswer = EasyMock.expect(Boolean.valueOf(this.offsetWriter.beginFlush())).andAnswer(() -> {
            atomicInteger.incrementAndGet();
            countDownLatch.countDown();
            return true;
        });
        if (FlushOutcome.SUCCEED_ANY_TIMES.equals(flushOutcome)) {
            andAnswer.anyTimes();
        }
        Capture newCapture = EasyMock.newCapture();
        IExpectationSetters expect = EasyMock.expect(this.offsetWriter.doFlush((org.apache.kafka.connect.util.Callback) EasyMock.capture(newCapture)));
        switch (flushOutcome) {
            case SUCCEED:
                expect.andReturn((Object) null);
                KafkaProducer<byte[], byte[]> kafkaProducer = this.producer;
                kafkaProducer.getClass();
                expectCall(kafkaProducer::commitTransaction);
                expectCall(() -> {
                    this.sourceTask.commitRecord((SourceRecord) EasyMock.anyObject(), (RecordMetadata) EasyMock.anyObject());
                });
                SourceTask sourceTask = this.sourceTask;
                sourceTask.getClass();
                expectCall(sourceTask::commit);
                break;
            case SUCCEED_ANY_TIMES:
                expect.andReturn((Object) null).anyTimes();
                KafkaProducer<byte[], byte[]> kafkaProducer2 = this.producer;
                kafkaProducer2.getClass();
                expectCall(kafkaProducer2::commitTransaction).anyTimes();
                expectCall(() -> {
                    this.sourceTask.commitRecord((SourceRecord) EasyMock.anyObject(), (RecordMetadata) EasyMock.anyObject());
                }).anyTimes();
                SourceTask sourceTask2 = this.sourceTask;
                sourceTask2.getClass();
                expectCall(sourceTask2::commit).anyTimes();
                break;
            case FAIL_FLUSH_CALLBACK:
                KafkaProducer<byte[], byte[]> kafkaProducer3 = this.producer;
                kafkaProducer3.getClass();
                expectCall(kafkaProducer3::commitTransaction);
                expect.andAnswer(() -> {
                    ((org.apache.kafka.connect.util.Callback) newCapture.getValue()).onCompletion(new RecordTooLargeException(), (Object) null);
                    return null;
                });
                OffsetStorageWriter offsetStorageWriter = this.offsetWriter;
                offsetStorageWriter.getClass();
                expectCall(offsetStorageWriter::cancelFlush);
                break;
            case FAIL_TRANSACTION_COMMIT:
                expect.andReturn((Object) null);
                KafkaProducer<byte[], byte[]> kafkaProducer4 = this.producer;
                kafkaProducer4.getClass();
                expectCall(kafkaProducer4::commitTransaction).andThrow(new RecordTooLargeException());
                OffsetStorageWriter offsetStorageWriter2 = this.offsetWriter;
                offsetStorageWriter2.getClass();
                expectCall(offsetStorageWriter2::cancelFlush);
                break;
            default:
                Assert.fail("Unexpected flush outcome: " + flushOutcome);
                break;
        }
        return countDownLatch;
    }

    private CountDownLatch expectFlush(FlushOutcome flushOutcome) {
        return expectFlush(flushOutcome, new AtomicInteger());
    }

    private CountDownLatch expectAnyFlushes(AtomicInteger atomicInteger) {
        EasyMock.expect(Boolean.valueOf(this.offsetWriter.willFlush())).andReturn(true).anyTimes();
        return expectFlush(FlushOutcome.SUCCEED_ANY_TIMES, atomicInteger);
    }

    private void assertTransactionMetrics(int i) {
        ConnectMetrics.MetricGroup metricGroup = this.workerTask.transactionMetricsGroup().metricGroup();
        double currentMetricValueAsDouble = this.metrics.currentMetricValueAsDouble(metricGroup, "transaction-size-min");
        double currentMetricValueAsDouble2 = this.metrics.currentMetricValueAsDouble(metricGroup, "transaction-size-max");
        double currentMetricValueAsDouble3 = this.metrics.currentMetricValueAsDouble(metricGroup, "transaction-size-avg");
        Assert.assertTrue(currentMetricValueAsDouble >= 0.0d);
        Assert.assertTrue(currentMetricValueAsDouble2 >= ((double) i));
        if (currentMetricValueAsDouble2 - currentMetricValueAsDouble <= 1.0E-6d) {
            Assert.assertEquals(currentMetricValueAsDouble2, currentMetricValueAsDouble3, 2.0E-6d);
        } else {
            Assert.assertTrue("Average transaction size should be greater than minimum transaction size", currentMetricValueAsDouble3 > currentMetricValueAsDouble);
            Assert.assertTrue("Average transaction size should be less than maximum transaction size", currentMetricValueAsDouble3 < currentMetricValueAsDouble2);
        }
    }

    private void assertPollMetrics(int i) {
        ConnectMetrics.MetricGroup metricGroup = this.workerTask.sourceTaskMetricsGroup().metricGroup();
        ConnectMetrics.MetricGroup metricGroup2 = this.workerTask.taskMetricsGroup().metricGroup();
        double currentMetricValueAsDouble = this.metrics.currentMetricValueAsDouble(metricGroup, "source-record-poll-rate");
        double currentMetricValueAsDouble2 = this.metrics.currentMetricValueAsDouble(metricGroup, "source-record-poll-total");
        if (i > 0) {
            Assert.assertEquals(RECORDS.size(), this.metrics.currentMetricValueAsDouble(metricGroup2, "batch-size-max"), 1.0E-6d);
            Assert.assertEquals(RECORDS.size(), this.metrics.currentMetricValueAsDouble(metricGroup2, "batch-size-avg"), 1.0E-6d);
            Assert.assertTrue(currentMetricValueAsDouble > 0.0d);
        } else {
            Assert.assertTrue(currentMetricValueAsDouble == 0.0d);
        }
        Assert.assertTrue(currentMetricValueAsDouble2 >= ((double) i));
        double currentMetricValueAsDouble3 = this.metrics.currentMetricValueAsDouble(metricGroup, "source-record-write-rate");
        double currentMetricValueAsDouble4 = this.metrics.currentMetricValueAsDouble(metricGroup, "source-record-write-total");
        if (i > 0) {
            Assert.assertTrue(currentMetricValueAsDouble3 > 0.0d);
        } else {
            Assert.assertTrue(currentMetricValueAsDouble3 == 0.0d);
        }
        Assert.assertTrue(currentMetricValueAsDouble4 >= ((double) i));
        double currentMetricValueAsDouble5 = this.metrics.currentMetricValueAsDouble(metricGroup, "poll-batch-max-time-ms");
        double currentMetricValueAsDouble6 = this.metrics.currentMetricValueAsDouble(metricGroup, "poll-batch-avg-time-ms");
        if (i > 0) {
            Assert.assertTrue(currentMetricValueAsDouble5 >= 0.0d);
        }
        Assert.assertTrue(Double.isNaN(currentMetricValueAsDouble6) || currentMetricValueAsDouble6 > 0.0d);
        double currentMetricValueAsDouble7 = this.metrics.currentMetricValueAsDouble(metricGroup, "source-record-active-count");
        double currentMetricValueAsDouble8 = this.metrics.currentMetricValueAsDouble(metricGroup, "source-record-active-count-max");
        Assert.assertEquals(0.0d, currentMetricValueAsDouble7, 1.0E-6d);
        if (i > 0) {
            Assert.assertEquals(RECORDS.size(), currentMetricValueAsDouble8, 1.0E-6d);
        }
    }

    private RecordHeaders emptyHeaders() {
        return new RecordHeaders();
    }

    private static <T> IExpectationSetters<T> expectCall(MockedMethodCall mockedMethodCall) {
        try {
            mockedMethodCall.invoke();
            return EasyMock.expectLastCall();
        } catch (RuntimeException e) {
            throw e;
        } catch (Exception e2) {
            throw new RuntimeException("Mocked method invocation threw a checked exception", e2);
        }
    }

    private void expectPreflight() {
        Runnable runnable = this.preProducerCheck;
        runnable.getClass();
        expectCall(runnable::run);
        KafkaProducer<byte[], byte[]> kafkaProducer = this.producer;
        kafkaProducer.getClass();
        expectCall(kafkaProducer::initTransactions);
        Runnable runnable2 = this.postProducerCheck;
        runnable2.getClass();
        expectCall(runnable2::run);
        ConnectorOffsetBackingStore connectorOffsetBackingStore = this.offsetStore;
        connectorOffsetBackingStore.getClass();
        expectCall(connectorOffsetBackingStore::start);
    }

    private void expectStartup() {
        expectCall(() -> {
            this.sourceTask.initialize((SourceTaskContext) EasyMock.anyObject(SourceTaskContext.class));
        });
        expectCall(() -> {
            this.sourceTask.start(TASK_PROPS);
        });
        expectCall(() -> {
            this.statusListener.onStartup(this.taskId);
        });
    }

    private void expectClose() {
        ConnectorOffsetBackingStore connectorOffsetBackingStore = this.offsetStore;
        connectorOffsetBackingStore.getClass();
        expectCall(connectorOffsetBackingStore::stop);
        expectCall(() -> {
            this.producer.close((Duration) EasyMock.anyObject(Duration.class));
        });
        expectCall(() -> {
            this.admin.close((Duration) EasyMock.anyObject(Duration.class));
        });
        TransformationChain<SourceRecord> transformationChain = this.transformationChain;
        transformationChain.getClass();
        expectCall(transformationChain::close);
        CloseableOffsetStorageReader closeableOffsetStorageReader = this.offsetReader;
        closeableOffsetStorageReader.getClass();
        expectCall(closeableOffsetStorageReader::close);
    }

    private void expectTopicCreation(String str) {
        if (this.config.topicCreationEnable()) {
            EasyMock.expect(this.admin.describeTopics(new String[]{str})).andReturn(Collections.emptyMap());
            EasyMock.expect(this.admin.createOrFindTopics(new NewTopic[]{(NewTopic) EasyMock.capture(EasyMock.newCapture())})).andReturn(createdTopic(str));
        }
    }

    static {
        TASK_PROPS.put("task.class", TestSourceTask.class.getName());
        TASK_CONFIG = new TaskConfig(TASK_PROPS);
        SOURCE_RECORD = new SourceRecord(PARTITION, OFFSET, "topic", (Integer) null, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
        RECORDS = Collections.singletonList(SOURCE_RECORD);
    }
}
