package org.apache.kafka.clients.consumer.internals;

import java.time.Duration;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.SortedSet;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.GroupProtocol;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.clients.consumer.RetriableCommitFailedException;
import org.apache.kafka.clients.consumer.RoundRobinAssignor;
import org.apache.kafka.clients.consumer.internals.events.ApplicationEvent;
import org.apache.kafka.clients.consumer.internals.events.ApplicationEventHandler;
import org.apache.kafka.clients.consumer.internals.events.AssignmentChangeApplicationEvent;
import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent;
import org.apache.kafka.clients.consumer.internals.events.CommitApplicationEvent;
import org.apache.kafka.clients.consumer.internals.events.CommitOnCloseApplicationEvent;
import org.apache.kafka.clients.consumer.internals.events.CompletableApplicationEvent;
import org.apache.kafka.clients.consumer.internals.events.ConsumerRebalanceListenerCallbackNeededEvent;
import org.apache.kafka.clients.consumer.internals.events.ErrorBackgroundEvent;
import org.apache.kafka.clients.consumer.internals.events.EventProcessor;
import org.apache.kafka.clients.consumer.internals.events.FetchCommittedOffsetsApplicationEvent;
import org.apache.kafka.clients.consumer.internals.events.GroupMetadataUpdateEvent;
import org.apache.kafka.clients.consumer.internals.events.LeaveOnCloseApplicationEvent;
import org.apache.kafka.clients.consumer.internals.events.ListOffsetsApplicationEvent;
import org.apache.kafka.clients.consumer.internals.events.NewTopicsMetadataUpdateRequestEvent;
import org.apache.kafka.clients.consumer.internals.events.PollApplicationEvent;
import org.apache.kafka.clients.consumer.internals.events.ResetPositionsApplicationEvent;
import org.apache.kafka.clients.consumer.internals.events.SubscriptionChangeApplicationEvent;
import org.apache.kafka.clients.consumer.internals.events.UnsubscribeApplicationEvent;
import org.apache.kafka.clients.consumer.internals.events.ValidatePositionsApplicationEvent;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.GroupAuthorizationException;
import org.apache.kafka.common.errors.InvalidGroupIdException;
import org.apache.kafka.common.errors.RetriableException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Timer;
import org.apache.kafka.common.utils.Utils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.function.Executable;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;

/* loaded from: input_file:org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.class */
public class AsyncKafkaConsumerTest {
    private long retryBackoffMs = 100;
    private int defaultApiTimeoutMs = 1000;
    private boolean autoCommitEnabled = true;
    private AsyncKafkaConsumer<String, String> consumer = null;
    private final Time time = new MockTime(1);
    private final FetchCollector<String, String> fetchCollector = (FetchCollector) Mockito.mock(FetchCollector.class);
    private final ApplicationEventHandler applicationEventHandler = (ApplicationEventHandler) Mockito.mock(ApplicationEventHandler.class);
    private final ConsumerMetadata metadata = (ConsumerMetadata) Mockito.mock(ConsumerMetadata.class);
    private final LinkedBlockingQueue<BackgroundEvent> backgroundEventQueue = new LinkedBlockingQueue<>();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest$MockCommitCallback.class */
    public static class MockCommitCallback implements OffsetCommitCallback {
        public int invoked;
        public Exception exception;
        public String completionThread;

        private MockCommitCallback() {
            this.invoked = 0;
            this.exception = null;
        }

        public void onComplete(Map<TopicPartition, OffsetAndMetadata> map, Exception exc) {
            this.invoked++;
            this.completionThread = Thread.currentThread().getName();
            this.exception = exc;
        }
    }

    @AfterEach
    public void resetAll() {
        this.backgroundEventQueue.clear();
        if (this.consumer != null) {
            this.consumer.close(Duration.ZERO);
        }
        this.consumer = null;
        Mockito.framework().clearInlineMocks();
    }

    private AsyncKafkaConsumer<String, String> newConsumer() {
        Properties requiredConsumerProperties = requiredConsumerProperties();
        requiredConsumerProperties.put("group.id", "group-id");
        return newConsumer(new ConsumerConfig(requiredConsumerProperties));
    }

    private AsyncKafkaConsumer<String, String> newConsumerWithoutGroupId() {
        return newConsumer(new ConsumerConfig(requiredConsumerProperties()));
    }

    private AsyncKafkaConsumer<String, String> newConsumerWithEmptyGroupId() {
        return newConsumer(new ConsumerConfig(requiredConsumerPropertiesAndGroupId("")));
    }

    private AsyncKafkaConsumer<String, String> newConsumer(ConsumerConfig consumerConfig) {
        return new AsyncKafkaConsumer<>(consumerConfig, new StringDeserializer(), new StringDeserializer(), this.time, (logContext, time, blockingQueue, supplier, supplier2, supplier3) -> {
            return this.applicationEventHandler;
        }, (logContext2, consumerMetadata, subscriptionState, fetchConfig, deserializers, fetchMetricsManager, time2) -> {
            return this.fetchCollector;
        }, (consumerConfig2, subscriptionState2, logContext3, clusterResourceListeners) -> {
            return this.metadata;
        }, this.backgroundEventQueue);
    }

    private AsyncKafkaConsumer<String, String> newConsumer(FetchBuffer fetchBuffer, ConsumerInterceptors<String, String> consumerInterceptors, ConsumerRebalanceListenerInvoker consumerRebalanceListenerInvoker, SubscriptionState subscriptionState, List<ConsumerPartitionAssignor> list, String str, String str2) {
        return new AsyncKafkaConsumer<>(new LogContext(), str2, new Deserializers(new StringDeserializer(), new StringDeserializer()), fetchBuffer, this.fetchCollector, consumerInterceptors, this.time, this.applicationEventHandler, this.backgroundEventQueue, consumerRebalanceListenerInvoker, new Metrics(), subscriptionState, this.metadata, this.retryBackoffMs, this.defaultApiTimeoutMs, list, str, this.autoCommitEnabled);
    }

    @Test
    public void testSuccessfulStartupShutdown() {
        this.consumer = newConsumer();
        Assertions.assertDoesNotThrow(() -> {
            this.consumer.close();
        });
    }

    @Test
    public void testInvalidGroupId() {
        Assertions.assertInstanceOf(InvalidGroupIdException.class, Assertions.assertThrows(KafkaException.class, this::newConsumerWithEmptyGroupId).getCause());
    }

    @Test
    public void testFailOnClosedConsumer() {
        this.consumer = newConsumer();
        this.consumer.close();
        AsyncKafkaConsumer<String, String> asyncKafkaConsumer = this.consumer;
        Objects.requireNonNull(asyncKafkaConsumer);
        Assertions.assertEquals("This consumer has already been closed.", ((IllegalStateException) Assertions.assertThrows(IllegalStateException.class, asyncKafkaConsumer::assignment)).getMessage());
    }

    @Test
    public void testCommitAsyncWithNullCallback() {
        this.consumer = newConsumer();
        TopicPartition topicPartition = new TopicPartition("t0", 2);
        TopicPartition topicPartition2 = new TopicPartition("t0", 3);
        HashMap hashMap = new HashMap();
        hashMap.put(topicPartition, new OffsetAndMetadata(10L));
        hashMap.put(topicPartition2, new OffsetAndMetadata(20L));
        this.consumer.commitAsync(hashMap, (OffsetCommitCallback) null);
        ArgumentCaptor forClass = ArgumentCaptor.forClass(CommitApplicationEvent.class);
        ((ApplicationEventHandler) Mockito.verify(this.applicationEventHandler)).add((ApplicationEvent) forClass.capture());
        CommitApplicationEvent commitApplicationEvent = (CommitApplicationEvent) forClass.getValue();
        Assertions.assertEquals(hashMap, commitApplicationEvent.offsets());
        Assertions.assertDoesNotThrow(() -> {
            return Boolean.valueOf(commitApplicationEvent.future().complete(null));
        });
        Assertions.assertDoesNotThrow(() -> {
            this.consumer.commitAsync(hashMap, (OffsetCommitCallback) null);
        });
    }

    @Test
    public void testCommitAsyncUserSuppliedCallbackNoException() {
        this.consumer = newConsumer();
        HashMap hashMap = new HashMap();
        hashMap.put(new TopicPartition("my-topic", 1), new OffsetAndMetadata(200L));
        completeCommitApplicationEventExceptionally();
        MockCommitCallback mockCommitCallback = new MockCommitCallback();
        Assertions.assertDoesNotThrow(() -> {
            this.consumer.commitAsync(hashMap, mockCommitCallback);
        });
        forceCommitCallbackInvocation();
        Assertions.assertNull(mockCommitCallback.exception);
    }

    @MethodSource({"commitExceptionSupplier"})
    @ParameterizedTest
    public void testCommitAsyncUserSuppliedCallbackWithException(Exception exc) {
        this.consumer = newConsumer();
        HashMap hashMap = new HashMap();
        hashMap.put(new TopicPartition("my-topic", 1), new OffsetAndMetadata(200L));
        completeCommitApplicationEventExceptionally(exc);
        MockCommitCallback mockCommitCallback = new MockCommitCallback();
        Assertions.assertDoesNotThrow(() -> {
            this.consumer.commitAsync(hashMap, mockCommitCallback);
        });
        forceCommitCallbackInvocation();
        Assertions.assertSame(exc.getClass(), mockCommitCallback.exception.getClass());
    }

    private static Stream<Exception> commitExceptionSupplier() {
        return Stream.of((Object[]) new Exception[]{new KafkaException("Test exception"), new GroupAuthorizationException("Group authorization exception")});
    }

    @Test
    public void testCommitAsyncWithFencedException() {
        this.consumer = newConsumer();
        HashMap<TopicPartition, OffsetAndMetadata> mockTopicPartitionOffset = mockTopicPartitionOffset();
        MockCommitCallback mockCommitCallback = new MockCommitCallback();
        Assertions.assertDoesNotThrow(() -> {
            this.consumer.commitAsync(mockTopicPartitionOffset, mockCommitCallback);
        });
        ArgumentCaptor forClass = ArgumentCaptor.forClass(CommitApplicationEvent.class);
        ((ApplicationEventHandler) Mockito.verify(this.applicationEventHandler)).add((ApplicationEvent) forClass.capture());
        ((CommitApplicationEvent) forClass.getValue()).future().completeExceptionally(Errors.FENCED_INSTANCE_ID.exception());
        Assertions.assertThrows(Errors.FENCED_INSTANCE_ID.exception().getClass(), () -> {
            this.consumer.commitAsync();
        });
    }

    @Test
    public void testCommitted() {
        this.consumer = newConsumer();
        HashMap<TopicPartition, OffsetAndMetadata> mockTopicPartitionOffset = mockTopicPartitionOffset();
        completeFetchedCommittedOffsetApplicationEventSuccessfully(mockTopicPartitionOffset);
        Assertions.assertEquals(mockTopicPartitionOffset, this.consumer.committed(mockTopicPartitionOffset.keySet(), Duration.ofMillis(1000L)));
        ((ApplicationEventHandler) Mockito.verify(this.applicationEventHandler)).addAndGet((CompletableApplicationEvent) ArgumentMatchers.isA(FetchCommittedOffsetsApplicationEvent.class), (Timer) ArgumentMatchers.any());
    }

    @Test
    public void testCommittedLeaderEpochUpdate() {
        this.consumer = newConsumer();
        TopicPartition topicPartition = new TopicPartition("t0", 2);
        TopicPartition topicPartition2 = new TopicPartition("t0", 3);
        TopicPartition topicPartition3 = new TopicPartition("t0", 4);
        HashMap hashMap = new HashMap();
        hashMap.put(topicPartition, new OffsetAndMetadata(10L, Optional.of(2), ""));
        hashMap.put(topicPartition2, null);
        hashMap.put(topicPartition3, new OffsetAndMetadata(20L, Optional.of(3), ""));
        completeFetchedCommittedOffsetApplicationEventSuccessfully(hashMap);
        Assertions.assertDoesNotThrow(() -> {
            return this.consumer.committed(hashMap.keySet(), Duration.ofMillis(1000L));
        });
        ((ConsumerMetadata) Mockito.verify(this.metadata)).updateLastSeenEpochIfNewer(topicPartition, 2);
        ((ConsumerMetadata) Mockito.verify(this.metadata)).updateLastSeenEpochIfNewer(topicPartition3, 3);
        ((ApplicationEventHandler) Mockito.verify(this.applicationEventHandler)).addAndGet((CompletableApplicationEvent) ArgumentMatchers.isA(FetchCommittedOffsetsApplicationEvent.class), (Timer) ArgumentMatchers.any());
    }

    @Test
    public void testCommittedExceptionThrown() {
        this.consumer = newConsumer();
        HashMap<TopicPartition, OffsetAndMetadata> mockTopicPartitionOffset = mockTopicPartitionOffset();
        Mockito.when((Map) this.applicationEventHandler.addAndGet((CompletableApplicationEvent) ArgumentMatchers.any(FetchCommittedOffsetsApplicationEvent.class), (Timer) ArgumentMatchers.any())).thenAnswer(invocationOnMock -> {
            Assertions.assertInstanceOf(FetchCommittedOffsetsApplicationEvent.class, (CompletableApplicationEvent) invocationOnMock.getArgument(0));
            throw new KafkaException("Test exception");
        });
        Assertions.assertThrows(KafkaException.class, () -> {
            this.consumer.committed(mockTopicPartitionOffset.keySet(), Duration.ofMillis(1000L));
        });
    }

    @Test
    public void testWakeupBeforeCallingPoll() {
        this.consumer = newConsumer();
        TopicPartition topicPartition = new TopicPartition("foo", 3);
        ((FetchCollector) Mockito.doReturn(Fetch.empty()).when(this.fetchCollector)).collectFetch((FetchBuffer) ArgumentMatchers.any(FetchBuffer.class));
        completeFetchedCommittedOffsetApplicationEventSuccessfully(Utils.mkMap(new Map.Entry[]{Utils.mkEntry(topicPartition, new OffsetAndMetadata(1L))}));
        ((ConsumerMetadata) Mockito.doReturn(Metadata.LeaderAndEpoch.noLeaderOrEpoch()).when(this.metadata)).currentLeader((TopicPartition) ArgumentMatchers.any());
        this.consumer.assign(Collections.singleton(topicPartition));
        this.consumer.wakeup();
        Assertions.assertThrows(WakeupException.class, () -> {
            this.consumer.poll(Duration.ZERO);
        });
        Assertions.assertDoesNotThrow(() -> {
            return this.consumer.poll(Duration.ZERO);
        });
    }

    @Test
    public void testWakeupAfterEmptyFetch() {
        this.consumer = newConsumer();
        TopicPartition topicPartition = new TopicPartition("foo", 3);
        ((FetchCollector) Mockito.doAnswer(invocationOnMock -> {
            this.consumer.wakeup();
            return Fetch.empty();
        }).when(this.fetchCollector)).collectFetch((FetchBuffer) ArgumentMatchers.any(FetchBuffer.class));
        completeFetchedCommittedOffsetApplicationEventSuccessfully(Utils.mkMap(new Map.Entry[]{Utils.mkEntry(topicPartition, new OffsetAndMetadata(1L))}));
        ((ConsumerMetadata) Mockito.doReturn(Metadata.LeaderAndEpoch.noLeaderOrEpoch()).when(this.metadata)).currentLeader((TopicPartition) ArgumentMatchers.any());
        this.consumer.assign(Collections.singleton(topicPartition));
        Assertions.assertThrows(WakeupException.class, () -> {
            this.consumer.poll(Duration.ofMinutes(1L));
        });
        Assertions.assertDoesNotThrow(() -> {
            return this.consumer.poll(Duration.ZERO);
        });
    }

    @Test
    public void testWakeupAfterNonEmptyFetch() {
        this.consumer = newConsumer();
        TopicPartition topicPartition = new TopicPartition("foo", 3);
        List asList = Arrays.asList(new ConsumerRecord("foo", 3, 2L, "key1", "value1"), new ConsumerRecord("foo", 3, 3L, "key2", "value2"));
        ((FetchCollector) Mockito.doAnswer(invocationOnMock -> {
            this.consumer.wakeup();
            return Fetch.forPartition(topicPartition, asList, true);
        }).when(this.fetchCollector)).collectFetch((FetchBuffer) Mockito.any(FetchBuffer.class));
        completeFetchedCommittedOffsetApplicationEventSuccessfully(Utils.mkMap(new Map.Entry[]{Utils.mkEntry(topicPartition, new OffsetAndMetadata(1L))}));
        ((ConsumerMetadata) Mockito.doReturn(Metadata.LeaderAndEpoch.noLeaderOrEpoch()).when(this.metadata)).currentLeader((TopicPartition) ArgumentMatchers.any());
        this.consumer.assign(Collections.singleton(topicPartition));
        Assertions.assertDoesNotThrow(() -> {
            return this.consumer.poll(Duration.ofMinutes(1L));
        });
        Assertions.assertThrows(WakeupException.class, () -> {
            this.consumer.poll(Duration.ZERO);
        });
    }

    @Test
    public void testClearWakeupTriggerAfterPoll() {
        this.consumer = newConsumer();
        TopicPartition topicPartition = new TopicPartition("foo", 3);
        ((FetchCollector) Mockito.doReturn(Fetch.forPartition(topicPartition, Arrays.asList(new ConsumerRecord("foo", 3, 2L, "key1", "value1"), new ConsumerRecord("foo", 3, 3L, "key2", "value2")), true)).when(this.fetchCollector)).collectFetch((FetchBuffer) ArgumentMatchers.any(FetchBuffer.class));
        completeFetchedCommittedOffsetApplicationEventSuccessfully(Utils.mkMap(new Map.Entry[]{Utils.mkEntry(topicPartition, new OffsetAndMetadata(1L))}));
        ((ConsumerMetadata) Mockito.doReturn(Metadata.LeaderAndEpoch.noLeaderOrEpoch()).when(this.metadata)).currentLeader((TopicPartition) ArgumentMatchers.any());
        this.consumer.assign(Collections.singleton(topicPartition));
        this.consumer.poll(Duration.ZERO);
        Assertions.assertDoesNotThrow(() -> {
            return this.consumer.poll(Duration.ZERO);
        });
    }

    @Test
    public void testEnsureCallbackExecutedByApplicationThread() {
        this.consumer = newConsumer();
        String name = Thread.currentThread().getName();
        MockCommitCallback mockCommitCallback = new MockCommitCallback();
        completeCommitApplicationEventExceptionally();
        Assertions.assertDoesNotThrow(() -> {
            this.consumer.commitAsync(new HashMap(), mockCommitCallback);
        });
        Assertions.assertEquals(1, this.consumer.callbacks());
        forceCommitCallbackInvocation();
        Assertions.assertEquals(name, mockCommitCallback.completionThread);
    }

    @Test
    public void testEnsureCommitSyncExecutedCommitAsyncCallbacks() {
        this.consumer = newConsumer();
        KafkaException kafkaException = new KafkaException("Async commit callback failed");
        OffsetCommitCallback offsetCommitCallback = (map, exc) -> {
            throw kafkaException;
        };
        Assertions.assertDoesNotThrow(() -> {
            this.consumer.commitAsync(new HashMap(), offsetCommitCallback);
        });
        Assertions.assertThrows(kafkaException.getClass(), () -> {
            this.consumer.commitSync();
        });
    }

    @Test
    public void testPollLongThrowsException() {
        this.consumer = newConsumer();
        Assertions.assertEquals("Consumer.poll(long) is not supported when \"group.protocol\" is \"consumer\". This method is deprecated and will be removed in the next major release.", ((Exception) Assertions.assertThrows(UnsupportedOperationException.class, () -> {
            this.consumer.poll(0L);
        })).getMessage());
    }

    @Test
    public void testCommitSyncLeaderEpochUpdate() {
        this.consumer = newConsumer();
        TopicPartition topicPartition = new TopicPartition("t0", 2);
        TopicPartition topicPartition2 = new TopicPartition("t0", 3);
        HashMap hashMap = new HashMap();
        hashMap.put(topicPartition, new OffsetAndMetadata(10L, Optional.of(2), ""));
        hashMap.put(topicPartition2, new OffsetAndMetadata(20L, Optional.of(1), ""));
        completeCommitApplicationEventExceptionally();
        this.consumer.assign(Arrays.asList(topicPartition, topicPartition2));
        Assertions.assertDoesNotThrow(() -> {
            this.consumer.commitSync(hashMap);
        });
        ((ConsumerMetadata) Mockito.verify(this.metadata)).updateLastSeenEpochIfNewer(topicPartition, 2);
        ((ConsumerMetadata) Mockito.verify(this.metadata)).updateLastSeenEpochIfNewer(topicPartition2, 1);
        ((ApplicationEventHandler) Mockito.verify(this.applicationEventHandler)).add((ApplicationEvent) ArgumentMatchers.isA(CommitApplicationEvent.class));
    }

    @Test
    public void testCommitAsyncLeaderEpochUpdate() {
        this.consumer = newConsumer((FetchBuffer) Mockito.mock(FetchBuffer.class), new ConsumerInterceptors<>(Collections.emptyList()), (ConsumerRebalanceListenerInvoker) Mockito.mock(ConsumerRebalanceListenerInvoker.class), new SubscriptionState(new LogContext(), OffsetResetStrategy.NONE), Collections.singletonList(new RoundRobinAssignor()), "group-id", "client-id");
        TopicPartition topicPartition = new TopicPartition("t0", 2);
        TopicPartition topicPartition2 = new TopicPartition("t0", 3);
        HashMap hashMap = new HashMap();
        hashMap.put(topicPartition, new OffsetAndMetadata(10L, Optional.of(2), ""));
        hashMap.put(topicPartition2, new OffsetAndMetadata(20L, Optional.of(1), ""));
        Mockito.when(this.metadata.currentLeader(topicPartition)).thenReturn(new Metadata.LeaderAndEpoch(Optional.of(new Node(1, "host", 9000)), Optional.of(1)));
        Mockito.when(this.metadata.currentLeader(topicPartition2)).thenReturn(new Metadata.LeaderAndEpoch(Optional.of(new Node(1, "host", 9000)), Optional.of(1)));
        this.consumer.assign(Arrays.asList(topicPartition, topicPartition2));
        this.consumer.seek(topicPartition, 10L);
        this.consumer.seek(topicPartition2, 20L);
        MockCommitCallback mockCommitCallback = new MockCommitCallback();
        Assertions.assertDoesNotThrow(() -> {
            this.consumer.commitAsync(hashMap, mockCommitCallback);
        });
        ((ConsumerMetadata) Mockito.verify(this.metadata)).updateLastSeenEpochIfNewer(topicPartition, 2);
        ((ConsumerMetadata) Mockito.verify(this.metadata)).updateLastSeenEpochIfNewer(topicPartition2, 1);
        ((ApplicationEventHandler) Mockito.verify(this.applicationEventHandler)).add((ApplicationEvent) ArgumentMatchers.isA(CommitApplicationEvent.class));
    }

    @Test
    public void testEnsurePollExecutedCommitAsyncCallbacks() {
        this.consumer = newConsumer();
        MockCommitCallback mockCommitCallback = new MockCommitCallback();
        completeCommitApplicationEventExceptionally();
        ((FetchCollector) Mockito.doReturn(Fetch.empty()).when(this.fetchCollector)).collectFetch((FetchBuffer) ArgumentMatchers.any(FetchBuffer.class));
        completeFetchedCommittedOffsetApplicationEventSuccessfully(Utils.mkMap(new Map.Entry[0]));
        this.consumer.assign(Collections.singleton(new TopicPartition("foo", 0)));
        Assertions.assertDoesNotThrow(() -> {
            this.consumer.commitAsync(new HashMap(), mockCommitCallback);
        });
        assertMockCommitCallbackInvoked(() -> {
            this.consumer.poll(Duration.ZERO);
        }, mockCommitCallback, null);
    }

    @Test
    public void testEnsureShutdownExecutedCommitAsyncCallbacks() {
        this.consumer = newConsumer();
        MockCommitCallback mockCommitCallback = new MockCommitCallback();
        completeCommitApplicationEventExceptionally();
        Assertions.assertDoesNotThrow(() -> {
            this.consumer.commitAsync(new HashMap(), mockCommitCallback);
        });
        assertMockCommitCallbackInvoked(() -> {
            this.consumer.close();
        }, mockCommitCallback, null);
    }

    @Test
    public void testVerifyApplicationEventOnShutdown() {
        this.consumer = newConsumer();
        ((ApplicationEventHandler) Mockito.doReturn((Object) null).when(this.applicationEventHandler)).addAndGet((CompletableApplicationEvent) ArgumentMatchers.any(), (Timer) ArgumentMatchers.any());
        this.consumer.close();
        ((ApplicationEventHandler) Mockito.verify(this.applicationEventHandler)).addAndGet((CompletableApplicationEvent) ArgumentMatchers.any(LeaveOnCloseApplicationEvent.class), (Timer) ArgumentMatchers.any());
        ((ApplicationEventHandler) Mockito.verify(this.applicationEventHandler)).add((ApplicationEvent) ArgumentMatchers.any(CommitOnCloseApplicationEvent.class));
    }

    @Test
    public void testPartitionRevocationOnClose() {
        MockRebalanceListener mockRebalanceListener = new MockRebalanceListener();
        SubscriptionState subscriptionState = new SubscriptionState(new LogContext(), OffsetResetStrategy.NONE);
        this.consumer = newConsumer((FetchBuffer) Mockito.mock(FetchBuffer.class), (ConsumerInterceptors) Mockito.mock(ConsumerInterceptors.class), (ConsumerRebalanceListenerInvoker) Mockito.mock(ConsumerRebalanceListenerInvoker.class), subscriptionState, Collections.singletonList(new RoundRobinAssignor()), "group-id", "client-id");
        this.consumer.subscribe(Collections.singleton("topic"), mockRebalanceListener);
        subscriptionState.assignFromSubscribed(Collections.singleton(new TopicPartition("topic", 0)));
        this.consumer.close(Duration.ZERO);
        Assertions.assertTrue(subscriptionState.assignedPartitions().isEmpty());
        Assertions.assertEquals(1, mockRebalanceListener.revokedCount);
    }

    @Test
    public void testFailedPartitionRevocationOnClose() {
        ConsumerRebalanceListener consumerRebalanceListener = (ConsumerRebalanceListener) Mockito.mock(ConsumerRebalanceListener.class);
        SubscriptionState subscriptionState = new SubscriptionState(new LogContext(), OffsetResetStrategy.NONE);
        this.consumer = newConsumer((FetchBuffer) Mockito.mock(FetchBuffer.class), new ConsumerInterceptors<>(Collections.emptyList()), (ConsumerRebalanceListenerInvoker) Mockito.mock(ConsumerRebalanceListenerInvoker.class), subscriptionState, Collections.singletonList(new RoundRobinAssignor()), "group-id", "client-id");
        subscriptionState.subscribe(Collections.singleton("topic"), Optional.of(consumerRebalanceListener));
        TopicPartition topicPartition = new TopicPartition("topic", 0);
        subscriptionState.assignFromSubscribed(Collections.singleton(topicPartition));
        ((ConsumerRebalanceListener) Mockito.doThrow(new Throwable[]{new KafkaException()}).when(consumerRebalanceListener)).onPartitionsRevoked((Collection) ArgumentMatchers.eq(Collections.singleton(topicPartition)));
        Assertions.assertThrows(KafkaException.class, () -> {
            this.consumer.close(Duration.ZERO);
        });
        ((ApplicationEventHandler) Mockito.verify(this.applicationEventHandler, Mockito.never())).addAndGet((CompletableApplicationEvent) ArgumentMatchers.any(LeaveOnCloseApplicationEvent.class), (Timer) ArgumentMatchers.any());
        ((ConsumerRebalanceListener) Mockito.verify(consumerRebalanceListener)).onPartitionsRevoked((Collection) ArgumentMatchers.eq(Collections.singleton(topicPartition)));
        Assertions.assertEquals(Collections.emptySet(), subscriptionState.assignedPartitions());
    }

    @Test
    public void testCompleteQuietly() {
        AtomicReference atomicReference = new AtomicReference();
        CompletableFuture completedFuture = CompletableFuture.completedFuture(null);
        this.consumer = newConsumer();
        Assertions.assertDoesNotThrow(() -> {
            this.consumer.completeQuietly(() -> {
                completedFuture.get(0L, TimeUnit.MILLISECONDS);
            }, "test", atomicReference);
        });
        Assertions.assertNull(atomicReference.get());
        Assertions.assertDoesNotThrow(() -> {
            this.consumer.completeQuietly(() -> {
                throw new KafkaException("Test exception");
            }, "test", atomicReference);
        });
        Assertions.assertTrue(atomicReference.get() instanceof KafkaException);
    }

    @Test
    public void testAutoCommitSyncEnabled() {
        SubscriptionState subscriptionState = new SubscriptionState(new LogContext(), OffsetResetStrategy.NONE);
        this.consumer = newConsumer((FetchBuffer) Mockito.mock(FetchBuffer.class), (ConsumerInterceptors) Mockito.mock(ConsumerInterceptors.class), (ConsumerRebalanceListenerInvoker) Mockito.mock(ConsumerRebalanceListenerInvoker.class), subscriptionState, Collections.singletonList(new RoundRobinAssignor()), "group-id", "client-id");
        this.consumer.subscribe(Collections.singleton("topic"), (ConsumerRebalanceListener) Mockito.mock(ConsumerRebalanceListener.class));
        subscriptionState.assignFromSubscribed(Collections.singleton(new TopicPartition("topic", 0)));
        subscriptionState.seek(new TopicPartition("topic", 0), 100L);
        this.consumer.maybeAutoCommitSync(true, this.time.timer(100L), (AtomicReference) null);
        ((ApplicationEventHandler) Mockito.verify(this.applicationEventHandler)).add((ApplicationEvent) ArgumentMatchers.any(CommitApplicationEvent.class));
    }

    @Test
    public void testAutoCommitSyncDisabled() {
        SubscriptionState subscriptionState = new SubscriptionState(new LogContext(), OffsetResetStrategy.NONE);
        this.consumer = newConsumer((FetchBuffer) Mockito.mock(FetchBuffer.class), (ConsumerInterceptors) Mockito.mock(ConsumerInterceptors.class), (ConsumerRebalanceListenerInvoker) Mockito.mock(ConsumerRebalanceListenerInvoker.class), subscriptionState, Collections.singletonList(new RoundRobinAssignor()), "group-id", "client-id");
        this.consumer.subscribe(Collections.singleton("topic"), (ConsumerRebalanceListener) Mockito.mock(ConsumerRebalanceListener.class));
        subscriptionState.assignFromSubscribed(Collections.singleton(new TopicPartition("topic", 0)));
        subscriptionState.seek(new TopicPartition("topic", 0), 100L);
        this.consumer.maybeAutoCommitSync(false, this.time.timer(100L), (AtomicReference) null);
        ((ApplicationEventHandler) Mockito.verify(this.applicationEventHandler, Mockito.never())).add((ApplicationEvent) ArgumentMatchers.any(CommitApplicationEvent.class));
    }

    private void assertMockCommitCallbackInvoked(Executable executable, MockCommitCallback mockCommitCallback, Errors errors) {
        Assertions.assertDoesNotThrow(executable);
        Assertions.assertEquals(1, mockCommitCallback.invoked);
        if (errors == null) {
            Assertions.assertNull(mockCommitCallback.exception);
        } else if (errors.exception() instanceof RetriableException) {
            Assertions.assertInstanceOf(RetriableCommitFailedException.class, mockCommitCallback.exception);
        }
    }

    @Test
    public void testAssign() {
        this.consumer = newConsumer();
        TopicPartition topicPartition = new TopicPartition("foo", 3);
        this.consumer.assign(Collections.singleton(topicPartition));
        Assertions.assertTrue(this.consumer.subscription().isEmpty());
        Assertions.assertTrue(this.consumer.assignment().contains(topicPartition));
        ((ApplicationEventHandler) Mockito.verify(this.applicationEventHandler)).add((ApplicationEvent) ArgumentMatchers.any(AssignmentChangeApplicationEvent.class));
        ((ApplicationEventHandler) Mockito.verify(this.applicationEventHandler)).add((ApplicationEvent) ArgumentMatchers.any(NewTopicsMetadataUpdateRequestEvent.class));
    }

    @Test
    public void testAssignOnNullTopicPartition() {
        this.consumer = newConsumer();
        Assertions.assertThrows(IllegalArgumentException.class, () -> {
            this.consumer.assign((Collection) null);
        });
    }

    @Test
    public void testAssignOnEmptyTopicPartition() {
        this.consumer = newConsumer();
        completeUnsubscribeApplicationEventSuccessfully();
        this.consumer.assign(Collections.emptyList());
        Assertions.assertTrue(this.consumer.subscription().isEmpty());
        Assertions.assertTrue(this.consumer.assignment().isEmpty());
    }

    @Test
    public void testAssignOnNullTopicInPartition() {
        this.consumer = newConsumer();
        Assertions.assertThrows(IllegalArgumentException.class, () -> {
            this.consumer.assign(Collections.singleton(new TopicPartition((String) null, 0)));
        });
    }

    @Test
    public void testAssignOnEmptyTopicInPartition() {
        this.consumer = newConsumer();
        Assertions.assertThrows(IllegalArgumentException.class, () -> {
            this.consumer.assign(Collections.singleton(new TopicPartition("  ", 0)));
        });
    }

    @Test
    public void testBeginningOffsetsFailsIfNullPartitions() {
        this.consumer = newConsumer();
        Assertions.assertThrows(NullPointerException.class, () -> {
            this.consumer.beginningOffsets((Collection) null, Duration.ofMillis(1L));
        });
    }

    @Test
    public void testBeginningOffsets() {
        this.consumer = newConsumer();
        HashMap<TopicPartition, OffsetAndTimestamp> mockOffsetAndTimestamp = mockOffsetAndTimestamp();
        Set<TopicPartition> keySet = mockOffsetAndTimestamp.keySet();
        ((ApplicationEventHandler) Mockito.doReturn(mockOffsetAndTimestamp).when(this.applicationEventHandler)).addAndGet((CompletableApplicationEvent) ArgumentMatchers.any(), (Timer) ArgumentMatchers.any());
        Assertions.assertEquals((Map) mockOffsetAndTimestamp.entrySet().stream().collect(Collectors.toMap((v0) -> {
            return v0.getKey();
        }, entry -> {
            return Long.valueOf(((OffsetAndTimestamp) entry.getValue()).offset());
        })), (Map) Assertions.assertDoesNotThrow(() -> {
            return this.consumer.beginningOffsets(keySet, Duration.ofMillis(1L));
        }));
        ((ApplicationEventHandler) Mockito.verify(this.applicationEventHandler)).addAndGet((CompletableApplicationEvent) ArgumentMatchers.isA(ListOffsetsApplicationEvent.class), (Timer) ArgumentMatchers.isA(Timer.class));
    }

    @Test
    public void testBeginningOffsetsThrowsKafkaExceptionForUnderlyingExecutionFailure() {
        this.consumer = newConsumer();
        Set<TopicPartition> keySet = mockTopicPartitionOffset().keySet();
        Throwable kafkaException = new KafkaException("Unexpected failure processing List Offsets event");
        ((ApplicationEventHandler) Mockito.doThrow(new Throwable[]{kafkaException}).when(this.applicationEventHandler)).addAndGet((CompletableApplicationEvent) ArgumentMatchers.any(ListOffsetsApplicationEvent.class), (Timer) ArgumentMatchers.any());
        Assertions.assertEquals(kafkaException, Assertions.assertThrows(KafkaException.class, () -> {
            this.consumer.beginningOffsets(keySet, Duration.ofMillis(1L));
        }));
        ((ApplicationEventHandler) Mockito.verify(this.applicationEventHandler)).addAndGet((CompletableApplicationEvent) ArgumentMatchers.isA(ListOffsetsApplicationEvent.class), (Timer) ArgumentMatchers.isA(Timer.class));
    }

    @Test
    public void testBeginningOffsetsTimeoutOnEventProcessingTimeout() {
        this.consumer = newConsumer();
        ((ApplicationEventHandler) Mockito.doThrow(new Throwable[]{new TimeoutException()}).when(this.applicationEventHandler)).addAndGet((CompletableApplicationEvent) ArgumentMatchers.any(), (Timer) ArgumentMatchers.any());
        Assertions.assertThrows(TimeoutException.class, () -> {
            this.consumer.beginningOffsets(Collections.singletonList(new TopicPartition("t1", 0)), Duration.ofMillis(1L));
        });
        ((ApplicationEventHandler) Mockito.verify(this.applicationEventHandler)).addAndGet((CompletableApplicationEvent) ArgumentMatchers.isA(ListOffsetsApplicationEvent.class), (Timer) ArgumentMatchers.isA(Timer.class));
    }

    @Test
    public void testOffsetsForTimesOnNullPartitions() {
        this.consumer = newConsumer();
        Assertions.assertThrows(NullPointerException.class, () -> {
            this.consumer.offsetsForTimes((Map) null, Duration.ofMillis(1L));
        });
    }

    @Test
    public void testOffsetsForTimesFailsOnNegativeTargetTimes() {
        this.consumer = newConsumer();
        Assertions.assertThrows(IllegalArgumentException.class, () -> {
            this.consumer.offsetsForTimes(Collections.singletonMap(new TopicPartition("topic1", 1), -2L), Duration.ofMillis(1L));
        });
        Assertions.assertThrows(IllegalArgumentException.class, () -> {
            this.consumer.offsetsForTimes(Collections.singletonMap(new TopicPartition("topic1", 1), -1L), Duration.ofMillis(1L));
        });
        Assertions.assertThrows(IllegalArgumentException.class, () -> {
            this.consumer.offsetsForTimes(Collections.singletonMap(new TopicPartition("topic1", 1), -3L), Duration.ofMillis(1L));
        });
    }

    @Test
    public void testOffsetsForTimes() {
        this.consumer = newConsumer();
        HashMap<TopicPartition, OffsetAndTimestamp> mockOffsetAndTimestamp = mockOffsetAndTimestamp();
        HashMap<TopicPartition, Long> mockTimestampToSearch = mockTimestampToSearch();
        ((ApplicationEventHandler) Mockito.doReturn(mockOffsetAndTimestamp).when(this.applicationEventHandler)).addAndGet((CompletableApplicationEvent) ArgumentMatchers.any(), (Timer) ArgumentMatchers.any());
        Assertions.assertEquals(mockOffsetAndTimestamp, (Map) Assertions.assertDoesNotThrow(() -> {
            return this.consumer.offsetsForTimes(mockTimestampToSearch, Duration.ofMillis(1L));
        }));
        ((ApplicationEventHandler) Mockito.verify(this.applicationEventHandler)).addAndGet((CompletableApplicationEvent) ArgumentMatchers.isA(ListOffsetsApplicationEvent.class), (Timer) ArgumentMatchers.isA(Timer.class));
    }

    @Test
    public void testOffsetsForTimesWithZeroTimeout() {
        this.consumer = newConsumer();
        TopicPartition topicPartition = new TopicPartition("topic1", 0);
        Map singletonMap = Collections.singletonMap(topicPartition, null);
        Map singletonMap2 = Collections.singletonMap(topicPartition, 5L);
        Assertions.assertEquals(singletonMap, (Map) Assertions.assertDoesNotThrow(() -> {
            return this.consumer.offsetsForTimes(singletonMap2, Duration.ZERO);
        }));
        ((ApplicationEventHandler) Mockito.verify(this.applicationEventHandler, Mockito.never())).addAndGet((CompletableApplicationEvent) ArgumentMatchers.isA(ListOffsetsApplicationEvent.class), (Timer) ArgumentMatchers.isA(Timer.class));
    }

    @Test
    public void testWakeupCommitted() {
        this.consumer = newConsumer();
        HashMap<TopicPartition, OffsetAndMetadata> mockTopicPartitionOffset = mockTopicPartitionOffset();
        ((ApplicationEventHandler) Mockito.doAnswer(invocationOnMock -> {
            CompletableApplicationEvent completableApplicationEvent = (CompletableApplicationEvent) invocationOnMock.getArgument(0);
            Timer timer = (Timer) invocationOnMock.getArgument(1);
            Assertions.assertInstanceOf(FetchCommittedOffsetsApplicationEvent.class, completableApplicationEvent);
            Assertions.assertTrue(completableApplicationEvent.future().isCompletedExceptionally());
            return ConsumerUtils.getResult(completableApplicationEvent.future(), timer);
        }).when(this.applicationEventHandler)).addAndGet((CompletableApplicationEvent) ArgumentMatchers.any(FetchCommittedOffsetsApplicationEvent.class), (Timer) ArgumentMatchers.any(Timer.class));
        this.consumer.wakeup();
        Assertions.assertThrows(WakeupException.class, () -> {
            this.consumer.committed(mockTopicPartitionOffset.keySet());
        });
        Assertions.assertNull(this.consumer.wakeupTrigger().getPendingTask());
    }

    @Test
    public void testRefreshCommittedOffsetsSuccess() {
        this.consumer = newConsumer();
        TopicPartition topicPartition = new TopicPartition("t1", 1);
        testRefreshCommittedOffsetsSuccess(Collections.singleton(topicPartition), Collections.singletonMap(topicPartition, new OffsetAndMetadata(10L)));
    }

    @Test
    public void testRefreshCommittedOffsetsSuccessButNoCommittedOffsetsFound() {
        this.consumer = newConsumer();
        testRefreshCommittedOffsetsSuccess(Collections.singleton(new TopicPartition("t1", 1)), Collections.emptyMap());
    }

    @Test
    public void testRefreshCommittedOffsetsShouldNotResetIfFailedWithTimeout() {
        this.consumer = newConsumer();
        testUpdateFetchPositionsWithFetchCommittedOffsetsTimeout(true);
    }

    @Test
    public void testRefreshCommittedOffsetsNotCalledIfNoGroupId() {
        this.consumer = newConsumerWithoutGroupId();
        testUpdateFetchPositionsWithFetchCommittedOffsetsTimeout(false);
    }

    @Test
    public void testSubscribeGeneratesEvent() {
        this.consumer = newConsumer();
        this.consumer.subscribe(Collections.singletonList("topic1"));
        Assertions.assertEquals(Collections.singleton("topic1"), this.consumer.subscription());
        Assertions.assertTrue(this.consumer.assignment().isEmpty());
        ((ApplicationEventHandler) Mockito.verify(this.applicationEventHandler)).add((ApplicationEvent) ArgumentMatchers.isA(SubscriptionChangeApplicationEvent.class));
    }

    @Test
    public void testUnsubscribeGeneratesUnsubscribeEvent() {
        this.consumer = newConsumer();
        completeUnsubscribeApplicationEventSuccessfully();
        this.consumer.unsubscribe();
        Assertions.assertTrue(this.consumer.subscription().isEmpty());
        Assertions.assertTrue(this.consumer.assignment().isEmpty());
        ((ApplicationEventHandler) Mockito.verify(this.applicationEventHandler)).add((ApplicationEvent) ArgumentMatchers.isA(UnsubscribeApplicationEvent.class));
    }

    @Test
    public void testSubscribeToEmptyListActsAsUnsubscribe() {
        this.consumer = newConsumer();
        completeUnsubscribeApplicationEventSuccessfully();
        this.consumer.subscribe(Collections.emptyList());
        Assertions.assertTrue(this.consumer.subscription().isEmpty());
        Assertions.assertTrue(this.consumer.assignment().isEmpty());
        ((ApplicationEventHandler) Mockito.verify(this.applicationEventHandler)).add((ApplicationEvent) ArgumentMatchers.isA(UnsubscribeApplicationEvent.class));
    }

    @Test
    public void testSubscribeToNullTopicCollection() {
        this.consumer = newConsumer();
        Assertions.assertThrows(IllegalArgumentException.class, () -> {
            this.consumer.subscribe((List) null);
        });
    }

    @Test
    public void testSubscriptionOnNullTopic() {
        this.consumer = newConsumer();
        Assertions.assertThrows(IllegalArgumentException.class, () -> {
            this.consumer.subscribe(Collections.singletonList(null));
        });
    }

    @Test
    public void testSubscriptionOnEmptyTopic() {
        this.consumer = newConsumer();
        String str = "  ";
        Assertions.assertThrows(IllegalArgumentException.class, () -> {
            this.consumer.subscribe(Collections.singletonList(str));
        });
    }

    @Test
    public void testGroupMetadataAfterCreationWithGroupIdIsNull() {
        ConsumerConfig consumerConfig = new ConsumerConfig(requiredConsumerProperties());
        this.consumer = newConsumer(consumerConfig);
        Assertions.assertFalse(consumerConfig.unused().contains("auto.commit.interval.ms"));
        Assertions.assertFalse(consumerConfig.unused().contains("internal.throw.on.fetch.stable.offset.unsupported"));
        AsyncKafkaConsumer<String, String> asyncKafkaConsumer = this.consumer;
        Objects.requireNonNull(asyncKafkaConsumer);
        Assertions.assertEquals("To use the group management or offset commit APIs, you must provide a valid group.id in the consumer configuration.", Assertions.assertThrows(InvalidGroupIdException.class, asyncKafkaConsumer::groupMetadata).getMessage());
    }

    @Test
    public void testGroupMetadataAfterCreationWithGroupIdIsNotNull() {
        this.consumer = newConsumer(new ConsumerConfig(requiredConsumerPropertiesAndGroupId("consumerGroupA")));
        ConsumerGroupMetadata groupMetadata = this.consumer.groupMetadata();
        Assertions.assertEquals("consumerGroupA", groupMetadata.groupId());
        Assertions.assertEquals(Optional.empty(), groupMetadata.groupInstanceId());
        Assertions.assertEquals(-1, groupMetadata.generationId());
        Assertions.assertEquals("", groupMetadata.memberId());
    }

    @Test
    public void testGroupMetadataAfterCreationWithGroupIdIsNotNullAndGroupInstanceIdSet() {
        Properties requiredConsumerPropertiesAndGroupId = requiredConsumerPropertiesAndGroupId("consumerGroupA");
        requiredConsumerPropertiesAndGroupId.put("group.instance.id", "groupInstanceId1");
        this.consumer = newConsumer(new ConsumerConfig(requiredConsumerPropertiesAndGroupId));
        ConsumerGroupMetadata groupMetadata = this.consumer.groupMetadata();
        Assertions.assertEquals("consumerGroupA", groupMetadata.groupId());
        Assertions.assertEquals(Optional.of("groupInstanceId1"), groupMetadata.groupInstanceId());
        Assertions.assertEquals(-1, groupMetadata.generationId());
        Assertions.assertEquals("", groupMetadata.memberId());
    }

    @Test
    public void testGroupMetadataUpdateSingleCall() {
        this.consumer = newConsumer(new ConsumerConfig(requiredConsumerPropertiesAndGroupId("consumerGroupA")));
        ((FetchCollector) Mockito.doReturn(Fetch.empty()).when(this.fetchCollector)).collectFetch((FetchBuffer) ArgumentMatchers.any(FetchBuffer.class));
        completeFetchedCommittedOffsetApplicationEventSuccessfully(Utils.mkMap(new Map.Entry[0]));
        ConsumerGroupMetadata consumerGroupMetadata = new ConsumerGroupMetadata("consumerGroupA", 1, "newMemberId", Optional.empty());
        this.backgroundEventQueue.add(new GroupMetadataUpdateEvent(1, "newMemberId"));
        this.consumer.assign(Collections.singletonList(new TopicPartition("topic", 0)));
        this.consumer.poll(Duration.ZERO);
        Assertions.assertEquals(consumerGroupMetadata, this.consumer.groupMetadata());
        Assertions.assertEquals(consumerGroupMetadata, this.consumer.groupMetadata());
    }

    @MethodSource({"listenerCallbacksInvokeSource"})
    @ParameterizedTest
    public void testListenerCallbacksInvoke(List<ConsumerRebalanceListenerMethodName> list, Optional<RuntimeException> optional, Optional<RuntimeException> optional2, Optional<RuntimeException> optional3, int i, int i2, int i3) {
        this.consumer = newConsumer();
        CounterConsumerRebalanceListener counterConsumerRebalanceListener = new CounterConsumerRebalanceListener(optional, optional2, optional3);
        ((FetchCollector) Mockito.doReturn(Fetch.empty()).when(this.fetchCollector)).collectFetch((FetchBuffer) ArgumentMatchers.any(FetchBuffer.class));
        this.consumer.subscribe(Collections.singletonList("topic"), counterConsumerRebalanceListener);
        SortedSet emptySortedSet = Collections.emptySortedSet();
        Iterator<ConsumerRebalanceListenerMethodName> it = list.iterator();
        while (it.hasNext()) {
            this.backgroundEventQueue.add(new ConsumerRebalanceListenerCallbackNeededEvent(it.next(), emptySortedSet));
            this.consumer.poll(Duration.ZERO);
        }
        Assertions.assertEquals(i, counterConsumerRebalanceListener.revokedCount());
        Assertions.assertEquals(i2, counterConsumerRebalanceListener.assignedCount());
        Assertions.assertEquals(i3, counterConsumerRebalanceListener.lostCount());
    }

    private static Stream<Arguments> listenerCallbacksInvokeSource() {
        Optional empty = Optional.empty();
        Optional of = Optional.of(new RuntimeException("Intentional error"));
        return Stream.of((Object[]) new Arguments[]{Arguments.of(new Object[]{Collections.emptyList(), empty, empty, empty, 0, 0, 0}), Arguments.of(new Object[]{Collections.singletonList(ConsumerRebalanceListenerMethodName.ON_PARTITIONS_REVOKED), empty, empty, empty, 1, 0, 0}), Arguments.of(new Object[]{Collections.singletonList(ConsumerRebalanceListenerMethodName.ON_PARTITIONS_ASSIGNED), empty, empty, empty, 0, 1, 0}), Arguments.of(new Object[]{Collections.singletonList(ConsumerRebalanceListenerMethodName.ON_PARTITIONS_LOST), empty, empty, empty, 0, 0, 1}), Arguments.of(new Object[]{Collections.singletonList(ConsumerRebalanceListenerMethodName.ON_PARTITIONS_REVOKED), of, empty, empty, 1, 0, 0}), Arguments.of(new Object[]{Collections.singletonList(ConsumerRebalanceListenerMethodName.ON_PARTITIONS_ASSIGNED), empty, of, empty, 0, 1, 0}), Arguments.of(new Object[]{Collections.singletonList(ConsumerRebalanceListenerMethodName.ON_PARTITIONS_LOST), empty, empty, of, 0, 0, 1}), Arguments.of(new Object[]{Arrays.asList(ConsumerRebalanceListenerMethodName.ON_PARTITIONS_REVOKED, ConsumerRebalanceListenerMethodName.ON_PARTITIONS_ASSIGNED), of, empty, empty, 1, 1, 0})});
    }

    @Test
    public void testBackgroundError() {
        this.consumer = newConsumer(new ConsumerConfig(requiredConsumerPropertiesAndGroupId("consumerGroupA")));
        KafkaException kafkaException = new KafkaException("Nobody expects the Spanish Inquisition");
        this.backgroundEventQueue.add(new ErrorBackgroundEvent(kafkaException));
        this.consumer.assign(Collections.singletonList(new TopicPartition("topic", 0)));
        Assertions.assertEquals(kafkaException.getMessage(), Assertions.assertThrows(KafkaException.class, () -> {
            this.consumer.poll(Duration.ZERO);
        }).getMessage());
    }

    @Test
    public void testMultipleBackgroundErrors() {
        this.consumer = newConsumer(new ConsumerConfig(requiredConsumerPropertiesAndGroupId("consumerGroupA")));
        KafkaException kafkaException = new KafkaException("Nobody expects the Spanish Inquisition");
        this.backgroundEventQueue.add(new ErrorBackgroundEvent(kafkaException));
        this.backgroundEventQueue.add(new ErrorBackgroundEvent(new KafkaException("Spam, Spam, Spam")));
        this.consumer.assign(Collections.singletonList(new TopicPartition("topic", 0)));
        Assertions.assertEquals(kafkaException.getMessage(), Assertions.assertThrows(KafkaException.class, () -> {
            this.consumer.poll(Duration.ZERO);
        }).getMessage());
        Assertions.assertTrue(this.backgroundEventQueue.isEmpty());
    }

    @Test
    public void testGroupRemoteAssignorUnusedIfGroupIdUndefined() {
        Properties requiredConsumerProperties = requiredConsumerProperties();
        requiredConsumerProperties.put("group.remote.assignor", "someAssignor");
        ConsumerConfig consumerConfig = new ConsumerConfig(requiredConsumerProperties);
        this.consumer = newConsumer(consumerConfig);
        Assertions.assertTrue(consumerConfig.unused().contains("group.remote.assignor"));
    }

    @Test
    public void testGroupRemoteAssignorUnusedInGenericProtocol() {
        Properties requiredConsumerProperties = requiredConsumerProperties();
        requiredConsumerProperties.put("group.id", "consumerGroupA");
        requiredConsumerProperties.put("group.protocol", GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT));
        requiredConsumerProperties.put("group.remote.assignor", "someAssignor");
        ConsumerConfig consumerConfig = new ConsumerConfig(requiredConsumerProperties);
        this.consumer = newConsumer(consumerConfig);
        Assertions.assertTrue(consumerConfig.unused().contains("group.remote.assignor"));
    }

    @Test
    public void testGroupRemoteAssignorUsedInConsumerProtocol() {
        Properties requiredConsumerProperties = requiredConsumerProperties();
        requiredConsumerProperties.put("group.id", "consumerGroupA");
        requiredConsumerProperties.put("group.protocol", GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT));
        requiredConsumerProperties.put("group.remote.assignor", "someAssignor");
        ConsumerConfig consumerConfig = new ConsumerConfig(requiredConsumerProperties);
        this.consumer = newConsumer(consumerConfig);
        Assertions.assertFalse(consumerConfig.unused().contains("group.remote.assignor"));
    }

    @Test
    public void testGroupIdNull() {
        Properties requiredConsumerProperties = requiredConsumerProperties();
        requiredConsumerProperties.put("auto.commit.interval.ms", 10000);
        requiredConsumerProperties.put("internal.throw.on.fetch.stable.offset.unsupported", true);
        ConsumerConfig consumerConfig = new ConsumerConfig(requiredConsumerProperties);
        this.consumer = newConsumer(consumerConfig);
        Assertions.assertFalse(consumerConfig.unused().contains("auto.commit.interval.ms"));
        Assertions.assertFalse(consumerConfig.unused().contains("internal.throw.on.fetch.stable.offset.unsupported"));
    }

    @Test
    public void testGroupIdNotNullAndValid() {
        Properties requiredConsumerPropertiesAndGroupId = requiredConsumerPropertiesAndGroupId("consumerGroupA");
        requiredConsumerPropertiesAndGroupId.put("auto.commit.interval.ms", 10000);
        requiredConsumerPropertiesAndGroupId.put("internal.throw.on.fetch.stable.offset.unsupported", true);
        ConsumerConfig consumerConfig = new ConsumerConfig(requiredConsumerPropertiesAndGroupId);
        this.consumer = newConsumer(consumerConfig);
        Assertions.assertTrue(consumerConfig.unused().contains("auto.commit.interval.ms"));
        Assertions.assertTrue(consumerConfig.unused().contains("internal.throw.on.fetch.stable.offset.unsupported"));
    }

    @Test
    public void testGroupIdEmpty() {
        testInvalidGroupId("");
    }

    @Test
    public void testGroupIdOnlyWhitespaces() {
        testInvalidGroupId("       ");
    }

    @Test
    public void testEnsurePollEventSentOnConsumerPoll() {
        this.consumer = newConsumer((FetchBuffer) Mockito.mock(FetchBuffer.class), new ConsumerInterceptors<>(Collections.emptyList()), (ConsumerRebalanceListenerInvoker) Mockito.mock(ConsumerRebalanceListenerInvoker.class), new SubscriptionState(new LogContext(), OffsetResetStrategy.NONE), Collections.singletonList(new RoundRobinAssignor()), "group-id", "client-id");
        TopicPartition topicPartition = new TopicPartition("topic", 0);
        List singletonList = Collections.singletonList(new ConsumerRecord("topic", 0, 2L, "key1", "value1"));
        ((FetchCollector) Mockito.doAnswer(invocationOnMock -> {
            return Fetch.forPartition(topicPartition, singletonList, true);
        }).when(this.fetchCollector)).collectFetch((FetchBuffer) Mockito.any(FetchBuffer.class));
        this.consumer.subscribe(Collections.singletonList("topic1"));
        this.consumer.poll(Duration.ofMillis(100L));
        ((ApplicationEventHandler) Mockito.verify(this.applicationEventHandler)).add((ApplicationEvent) ArgumentMatchers.any(PollApplicationEvent.class));
    }

    private void testInvalidGroupId(String str) {
        ConsumerConfig consumerConfig = new ConsumerConfig(requiredConsumerPropertiesAndGroupId(str));
        Assertions.assertEquals("Failed to construct kafka consumer", ((Exception) Assertions.assertThrows(KafkaException.class, () -> {
            this.consumer = newConsumer(consumerConfig);
        })).getMessage());
    }

    private Properties requiredConsumerPropertiesAndGroupId(String str) {
        Properties requiredConsumerProperties = requiredConsumerProperties();
        requiredConsumerProperties.put("group.id", str);
        return requiredConsumerProperties;
    }

    private Properties requiredConsumerProperties() {
        Properties properties = new Properties();
        properties.put("key.deserializer", StringDeserializer.class);
        properties.put("value.deserializer", StringDeserializer.class);
        properties.put("bootstrap.servers", "localhost:9091");
        return properties;
    }

    private void testUpdateFetchPositionsWithFetchCommittedOffsetsTimeout(boolean z) {
        completeFetchedCommittedOffsetApplicationEventExceptionally(new TimeoutException());
        ((FetchCollector) Mockito.doReturn(Fetch.empty()).when(this.fetchCollector)).collectFetch((FetchBuffer) ArgumentMatchers.any(FetchBuffer.class));
        this.consumer.assign(Collections.singleton(new TopicPartition("t1", 1)));
        this.consumer.poll(Duration.ZERO);
        ((ApplicationEventHandler) Mockito.verify(this.applicationEventHandler, Mockito.atLeast(1))).addAndGet((CompletableApplicationEvent) ArgumentMatchers.isA(ValidatePositionsApplicationEvent.class), (Timer) ArgumentMatchers.isA(Timer.class));
        if (z) {
            ((ApplicationEventHandler) Mockito.verify(this.applicationEventHandler, Mockito.atLeast(1))).addAndGet((CompletableApplicationEvent) ArgumentMatchers.isA(FetchCommittedOffsetsApplicationEvent.class), (Timer) ArgumentMatchers.isA(Timer.class));
            ((ApplicationEventHandler) Mockito.verify(this.applicationEventHandler, Mockito.never())).addAndGet((CompletableApplicationEvent) ArgumentMatchers.isA(ResetPositionsApplicationEvent.class), (Timer) ArgumentMatchers.isA(Timer.class));
        } else {
            ((ApplicationEventHandler) Mockito.verify(this.applicationEventHandler, Mockito.never())).addAndGet((CompletableApplicationEvent) ArgumentMatchers.isA(FetchCommittedOffsetsApplicationEvent.class), (Timer) ArgumentMatchers.isA(Timer.class));
            ((ApplicationEventHandler) Mockito.verify(this.applicationEventHandler, Mockito.atLeast(1))).addAndGet((CompletableApplicationEvent) ArgumentMatchers.isA(ResetPositionsApplicationEvent.class), (Timer) ArgumentMatchers.isA(Timer.class));
        }
    }

    private void testRefreshCommittedOffsetsSuccess(Set<TopicPartition> set, Map<TopicPartition, OffsetAndMetadata> map) {
        completeFetchedCommittedOffsetApplicationEventSuccessfully(map);
        ((FetchCollector) Mockito.doReturn(Fetch.empty()).when(this.fetchCollector)).collectFetch((FetchBuffer) ArgumentMatchers.any(FetchBuffer.class));
        ((ConsumerMetadata) Mockito.doReturn(Metadata.LeaderAndEpoch.noLeaderOrEpoch()).when(this.metadata)).currentLeader((TopicPartition) ArgumentMatchers.any());
        this.consumer.assign(set);
        this.consumer.poll(Duration.ZERO);
        ((ApplicationEventHandler) Mockito.verify(this.applicationEventHandler, Mockito.atLeast(1))).addAndGet((CompletableApplicationEvent) ArgumentMatchers.isA(ValidatePositionsApplicationEvent.class), (Timer) ArgumentMatchers.isA(Timer.class));
        ((ApplicationEventHandler) Mockito.verify(this.applicationEventHandler, Mockito.atLeast(1))).addAndGet((CompletableApplicationEvent) ArgumentMatchers.isA(FetchCommittedOffsetsApplicationEvent.class), (Timer) ArgumentMatchers.isA(Timer.class));
        ((ApplicationEventHandler) Mockito.verify(this.applicationEventHandler, Mockito.atLeast(1))).addAndGet((CompletableApplicationEvent) ArgumentMatchers.isA(ResetPositionsApplicationEvent.class), (Timer) ArgumentMatchers.isA(Timer.class));
    }

    @Test
    public void testLongPollWaitIsLimited() {
        this.consumer = newConsumer();
        this.consumer.subscribe(Collections.singletonList("topic1"));
        Assertions.assertEquals(Collections.singleton("topic1"), this.consumer.subscription());
        Assertions.assertTrue(this.consumer.assignment().isEmpty());
        TopicPartition topicPartition = new TopicPartition("topic1", 3);
        List asList = Arrays.asList(new ConsumerRecord("topic1", 3, 2L, "key1", "value1"), new ConsumerRecord("topic1", 3, 3L, "key2", "value2"));
        ((FetchCollector) Mockito.doAnswer(invocationOnMock -> {
            this.consumer.subscriptions().assignFromSubscribed(Collections.singleton(topicPartition));
            return Fetch.empty();
        }).doAnswer(invocationOnMock2 -> {
            return Fetch.forPartition(topicPartition, asList, true);
        }).when(this.fetchCollector)).collectFetch((FetchBuffer) ArgumentMatchers.any(FetchBuffer.class));
        Assertions.assertEquals(2, this.consumer.poll(Duration.ofMillis(10000L)).count());
        Assertions.assertEquals(Collections.singleton("topic1"), this.consumer.subscription());
        Assertions.assertEquals(Collections.singleton(topicPartition), this.consumer.assignment());
    }

    @Test
    public void testProcessBackgroundEventsWithInitialDelay() throws Exception {
        this.consumer = newConsumer();
        Timer timer = new MockTime().timer(1000L);
        CompletableFuture completableFuture = (CompletableFuture) Mockito.mock(CompletableFuture.class);
        CountDownLatch countDownLatch = new CountDownLatch(3);
        ((CompletableFuture) Mockito.doAnswer(invocationOnMock -> {
            countDownLatch.countDown();
            if (countDownLatch.getCount() > 0) {
                timer.sleep(((Long) invocationOnMock.getArgument(0, Long.class)).longValue());
                throw new java.util.concurrent.TimeoutException("Intentional timeout");
            }
            completableFuture.complete(null);
            return null;
        }).when(completableFuture)).get(((Long) ArgumentMatchers.any(Long.class)).longValue(), (TimeUnit) ArgumentMatchers.any(TimeUnit.class));
        EventProcessor eventProcessor = (EventProcessor) Mockito.mock(EventProcessor.class);
        try {
            this.consumer.processBackgroundEvents(eventProcessor, completableFuture, timer);
            Assertions.assertEquals(800L, timer.remainingMs());
            if (eventProcessor != null) {
                eventProcessor.close();
            }
        } catch (Throwable th) {
            if (eventProcessor != null) {
                try {
                    eventProcessor.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    public void testProcessBackgroundEventsWithoutDelay() {
        this.consumer = newConsumer();
        Timer timer = new MockTime().timer(1000L);
        CompletableFuture completedFuture = CompletableFuture.completedFuture(null);
        EventProcessor eventProcessor = (EventProcessor) Mockito.mock(EventProcessor.class);
        try {
            this.consumer.processBackgroundEvents(eventProcessor, completedFuture, timer);
            Assertions.assertEquals(1000L, timer.remainingMs());
            if (eventProcessor != null) {
                eventProcessor.close();
            }
        } catch (Throwable th) {
            if (eventProcessor != null) {
                try {
                    eventProcessor.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    public void testProcessBackgroundEventsTimesOut() throws Exception {
        this.consumer = newConsumer();
        Timer timer = new MockTime().timer(1000L);
        CompletableFuture completableFuture = (CompletableFuture) Mockito.mock(CompletableFuture.class);
        ((CompletableFuture) Mockito.doAnswer(invocationOnMock -> {
            timer.sleep(((Long) invocationOnMock.getArgument(0, Long.class)).longValue());
            throw new java.util.concurrent.TimeoutException("Intentional timeout");
        }).when(completableFuture)).get(((Long) ArgumentMatchers.any(Long.class)).longValue(), (TimeUnit) ArgumentMatchers.any(TimeUnit.class));
        EventProcessor eventProcessor = (EventProcessor) Mockito.mock(EventProcessor.class);
        try {
            Assertions.assertThrows(TimeoutException.class, () -> {
                this.consumer.processBackgroundEvents(eventProcessor, completableFuture, timer);
            });
            Assertions.assertEquals(0L, timer.remainingMs());
            if (eventProcessor != null) {
                eventProcessor.close();
            }
        } catch (Throwable th) {
            if (eventProcessor != null) {
                try {
                    eventProcessor.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    private HashMap<TopicPartition, OffsetAndMetadata> mockTopicPartitionOffset() {
        TopicPartition topicPartition = new TopicPartition("t0", 2);
        TopicPartition topicPartition2 = new TopicPartition("t0", 3);
        HashMap<TopicPartition, OffsetAndMetadata> hashMap = new HashMap<>();
        hashMap.put(topicPartition, new OffsetAndMetadata(10L));
        hashMap.put(topicPartition2, new OffsetAndMetadata(20L));
        return hashMap;
    }

    private HashMap<TopicPartition, OffsetAndTimestamp> mockOffsetAndTimestamp() {
        TopicPartition topicPartition = new TopicPartition("t0", 2);
        TopicPartition topicPartition2 = new TopicPartition("t0", 3);
        HashMap<TopicPartition, OffsetAndTimestamp> hashMap = new HashMap<>();
        hashMap.put(topicPartition, new OffsetAndTimestamp(5L, 1L));
        hashMap.put(topicPartition2, new OffsetAndTimestamp(6L, 3L));
        return hashMap;
    }

    private HashMap<TopicPartition, Long> mockTimestampToSearch() {
        TopicPartition topicPartition = new TopicPartition("t0", 2);
        TopicPartition topicPartition2 = new TopicPartition("t0", 3);
        HashMap<TopicPartition, Long> hashMap = new HashMap<>();
        hashMap.put(topicPartition, 1L);
        hashMap.put(topicPartition2, 2L);
        return hashMap;
    }

    private void completeCommitApplicationEventExceptionally(Exception exc) {
        ((ApplicationEventHandler) Mockito.doAnswer(invocationOnMock -> {
            ((CommitApplicationEvent) invocationOnMock.getArgument(0)).future().completeExceptionally(exc);
            return null;
        }).when(this.applicationEventHandler)).add((ApplicationEvent) ArgumentMatchers.isA(CommitApplicationEvent.class));
    }

    private void completeCommitApplicationEventExceptionally() {
        ((ApplicationEventHandler) Mockito.doAnswer(invocationOnMock -> {
            ((CommitApplicationEvent) invocationOnMock.getArgument(0)).future().complete(null);
            return null;
        }).when(this.applicationEventHandler)).add((ApplicationEvent) ArgumentMatchers.isA(CommitApplicationEvent.class));
    }

    private void completeFetchedCommittedOffsetApplicationEventSuccessfully(Map<TopicPartition, OffsetAndMetadata> map) {
        ((ApplicationEventHandler) Mockito.doReturn(map).when(this.applicationEventHandler)).addAndGet((CompletableApplicationEvent) ArgumentMatchers.any(FetchCommittedOffsetsApplicationEvent.class), (Timer) ArgumentMatchers.any(Timer.class));
    }

    private void completeFetchedCommittedOffsetApplicationEventExceptionally(Exception exc) {
        ((ApplicationEventHandler) Mockito.doThrow(new Throwable[]{exc}).when(this.applicationEventHandler)).addAndGet((CompletableApplicationEvent) ArgumentMatchers.any(FetchCommittedOffsetsApplicationEvent.class), (Timer) ArgumentMatchers.any(Timer.class));
    }

    private void completeUnsubscribeApplicationEventSuccessfully() {
        ((ApplicationEventHandler) Mockito.doAnswer(invocationOnMock -> {
            ((UnsubscribeApplicationEvent) invocationOnMock.getArgument(0)).future().complete(null);
            return null;
        }).when(this.applicationEventHandler)).add((ApplicationEvent) ArgumentMatchers.isA(UnsubscribeApplicationEvent.class));
    }

    private void forceCommitCallbackInvocation() {
        this.consumer.commitAsync();
    }
}
