package org.apache.kafka.clients.consumer.internals;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.regex.Pattern;
import org.apache.kafka.clients.ClientResponse;
import org.apache.kafka.clients.GroupRebalanceConfig;
import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.MockClient;
import org.apache.kafka.clients.NodeApiVersions;
import org.apache.kafka.clients.consumer.CommitFailedException;
import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.clients.consumer.RetriableCommitFailedException;
import org.apache.kafka.clients.consumer.internals.AbstractCoordinator;
import org.apache.kafka.clients.consumer.internals.SubscriptionState;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.ApiException;
import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.errors.DisconnectException;
import org.apache.kafka.common.errors.FencedInstanceIdException;
import org.apache.kafka.common.errors.GroupAuthorizationException;
import org.apache.kafka.common.errors.OffsetMetadataTooLarge;
import org.apache.kafka.common.errors.RebalanceInProgressException;
import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.internals.ClusterResourceListeners;
import org.apache.kafka.common.message.HeartbeatResponseData;
import org.apache.kafka.common.message.JoinGroupRequestData;
import org.apache.kafka.common.message.JoinGroupResponseData;
import org.apache.kafka.common.message.LeaveGroupRequestData;
import org.apache.kafka.common.message.LeaveGroupResponseData;
import org.apache.kafka.common.message.OffsetCommitRequestData;
import org.apache.kafka.common.message.OffsetCommitResponseData;
import org.apache.kafka.common.message.SyncGroupResponseData;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.types.Field;
import org.apache.kafka.common.protocol.types.Schema;
import org.apache.kafka.common.protocol.types.Struct;
import org.apache.kafka.common.protocol.types.Type;
import org.apache.kafka.common.requests.AbstractResponse;
import org.apache.kafka.common.requests.FindCoordinatorResponse;
import org.apache.kafka.common.requests.HeartbeatResponse;
import org.apache.kafka.common.requests.JoinGroupRequest;
import org.apache.kafka.common.requests.JoinGroupResponse;
import org.apache.kafka.common.requests.LeaveGroupRequest;
import org.apache.kafka.common.requests.LeaveGroupResponse;
import org.apache.kafka.common.requests.MetadataResponse;
import org.apache.kafka.common.requests.OffsetCommitRequest;
import org.apache.kafka.common.requests.OffsetCommitResponse;
import org.apache.kafka.common.requests.OffsetFetchResponse;
import org.apache.kafka.common.requests.RequestTestUtils;
import org.apache.kafka.common.requests.SyncGroupRequest;
import org.apache.kafka.common.requests.SyncGroupResponse;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Timer;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;

/* loaded from: input_file:org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.class */
public abstract class ConsumerCoordinatorTest {
    private GroupRebalanceConfig rebalanceConfig;
    private final ConsumerPartitionAssignor.RebalanceProtocol protocol;
    private final MockPartitionAssignor partitionAssignor;
    private final ThrowOnAssignmentAssignor throwOnAssignmentAssignor;
    private final ThrowOnAssignmentAssignor throwFatalErrorOnAssignmentAssignor;
    private final List<ConsumerPartitionAssignor> assignors;
    private final Map<String, MockPartitionAssignor> assignorMap;
    private MockClient client;
    private SubscriptionState subscriptions;
    private ConsumerMetadata metadata;
    private Metrics metrics;
    private ConsumerNetworkClient consumerClient;
    private MockRebalanceListener rebalanceListener;
    private MockCommitCallback mockOffsetCommitCallback;
    private ConsumerCoordinator coordinator;
    private final String topic1 = "test1";
    private final String topic2 = "test2";
    private final TopicPartition t1p = new TopicPartition("test1", 0);
    private final TopicPartition t2p = new TopicPartition("test2", 0);
    private final String groupId = "test-group";
    private final Optional<String> groupInstanceId = Optional.of("test-instance");
    private final int rebalanceTimeoutMs = 60000;
    private final int sessionTimeoutMs = 10000;
    private final int heartbeatIntervalMs = 5000;
    private final long retryBackoffMs = 100;
    private final int autoCommitIntervalMs = 2000;
    private final int requestTimeoutMs = 30000;
    private final int throttleMs = 10;
    private final MockTime time = new MockTime();
    private final String consumerId = "consumer";
    private final String consumerId2 = "consumer2";
    private MetadataResponse metadataResponse = RequestTestUtils.metadataUpdateWith(1, new HashMap<String, Integer>() { // from class: org.apache.kafka.clients.consumer.internals.ConsumerCoordinatorTest.1
        {
            put("test1", 1);
            put("test2", 1);
        }
    });
    private Node node = (Node) this.metadataResponse.brokers().iterator().next();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.kafka.clients.consumer.internals.ConsumerCoordinatorTest$17, reason: invalid class name */
    /* loaded from: input_file:org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest$17.class */
    public static /* synthetic */ class AnonymousClass17 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$kafka$clients$consumer$ConsumerPartitionAssignor$RebalanceProtocol = new int[ConsumerPartitionAssignor.RebalanceProtocol.values().length];

        static {
            try {
                $SwitchMap$org$apache$kafka$clients$consumer$ConsumerPartitionAssignor$RebalanceProtocol[ConsumerPartitionAssignor.RebalanceProtocol.EAGER.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$kafka$clients$consumer$ConsumerPartitionAssignor$RebalanceProtocol[ConsumerPartitionAssignor.RebalanceProtocol.COOPERATIVE.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest$MockCommitCallback.class */
    public static class MockCommitCallback implements OffsetCommitCallback {
        public int invoked;
        public Exception exception;

        private MockCommitCallback() {
            this.invoked = 0;
            this.exception = null;
        }

        public void onComplete(Map<TopicPartition, OffsetAndMetadata> map, Exception exc) {
            this.invoked++;
            this.exception = exc;
        }
    }

    /* loaded from: input_file:org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest$RackAwareAssignor.class */
    private static class RackAwareAssignor extends MockPartitionAssignor {
        private final Set<String> rackIds;

        RackAwareAssignor() {
            super(Arrays.asList(ConsumerPartitionAssignor.RebalanceProtocol.EAGER));
            this.rackIds = new HashSet();
        }

        @Override // org.apache.kafka.clients.consumer.internals.MockPartitionAssignor
        public Map<String, List<TopicPartition>> assign(Map<String, Integer> map, Map<String, ConsumerPartitionAssignor.Subscription> map2) {
            map2.forEach((str, subscription) -> {
                if (!subscription.rackId().isPresent()) {
                    throw new IllegalStateException("Rack id not provided in subscription for " + str);
                }
                this.rackIds.add((String) subscription.rackId().get());
            });
            return super.assign(map, map2);
        }
    }

    public ConsumerCoordinatorTest(ConsumerPartitionAssignor.RebalanceProtocol rebalanceProtocol) {
        this.protocol = rebalanceProtocol;
        this.partitionAssignor = new MockPartitionAssignor(Collections.singletonList(rebalanceProtocol));
        this.throwOnAssignmentAssignor = new ThrowOnAssignmentAssignor(Collections.singletonList(rebalanceProtocol), new KafkaException("Kaboom for assignment!"), "throw-on-assignment-assignor");
        this.throwFatalErrorOnAssignmentAssignor = new ThrowOnAssignmentAssignor(Collections.singletonList(rebalanceProtocol), new IllegalStateException("Illegal state for assignment!"), "throw-fatal-error-on-assignment-assignor");
        this.assignors = Arrays.asList(this.partitionAssignor, this.throwOnAssignmentAssignor, this.throwFatalErrorOnAssignmentAssignor);
        this.assignorMap = Utils.mkMap(new Map.Entry[]{Utils.mkEntry(this.partitionAssignor.name(), this.partitionAssignor), Utils.mkEntry(this.throwOnAssignmentAssignor.name(), this.throwOnAssignmentAssignor), Utils.mkEntry(this.throwFatalErrorOnAssignmentAssignor.name(), this.throwFatalErrorOnAssignmentAssignor)});
    }

    @BeforeEach
    public void setup() {
        LogContext logContext = new LogContext();
        this.subscriptions = new SubscriptionState(logContext, OffsetResetStrategy.EARLIEST);
        this.metadata = new ConsumerMetadata(0L, Long.MAX_VALUE, false, false, this.subscriptions, logContext, new ClusterResourceListeners());
        this.client = new MockClient((Time) this.time, (Metadata) this.metadata);
        this.client.updateMetadata(this.metadataResponse);
        this.consumerClient = new ConsumerNetworkClient(logContext, this.client, this.metadata, this.time, 100L, 30000, Integer.MAX_VALUE);
        this.metrics = new Metrics(this.time);
        this.rebalanceListener = new MockRebalanceListener();
        this.mockOffsetCommitCallback = new MockCommitCallback();
        this.partitionAssignor.clear();
        this.rebalanceConfig = buildRebalanceConfig(Optional.empty());
        this.coordinator = buildCoordinator(this.rebalanceConfig, this.metrics, this.assignors, false, this.subscriptions);
    }

    private GroupRebalanceConfig buildRebalanceConfig(Optional<String> optional) {
        return new GroupRebalanceConfig(10000, 60000, 5000, "test-group", optional, 100L, !optional.isPresent());
    }

    @AfterEach
    public void teardown() {
        this.metrics.close();
        this.coordinator.close(this.time.timer(0L));
    }

    @Test
    public void testMetrics() {
        Assertions.assertNotNull(getMetric("commit-latency-avg"));
        Assertions.assertNotNull(getMetric("commit-latency-max"));
        Assertions.assertNotNull(getMetric("commit-rate"));
        Assertions.assertNotNull(getMetric("commit-total"));
        Assertions.assertNotNull(getMetric("partition-revoked-latency-avg"));
        Assertions.assertNotNull(getMetric("partition-revoked-latency-max"));
        Assertions.assertNotNull(getMetric("partition-assigned-latency-avg"));
        Assertions.assertNotNull(getMetric("partition-assigned-latency-max"));
        Assertions.assertNotNull(getMetric("partition-lost-latency-avg"));
        Assertions.assertNotNull(getMetric("partition-lost-latency-max"));
        Assertions.assertNotNull(getMetric("assigned-partitions"));
        this.metrics.sensor("commit-latency").record(1.0d);
        this.metrics.sensor("commit-latency").record(6.0d);
        this.metrics.sensor("commit-latency").record(2.0d);
        Assertions.assertEquals(Double.valueOf(3.0d), getMetric("commit-latency-avg").metricValue());
        Assertions.assertEquals(Double.valueOf(6.0d), getMetric("commit-latency-max").metricValue());
        Assertions.assertEquals(Double.valueOf(0.1d), getMetric("commit-rate").metricValue());
        Assertions.assertEquals(Double.valueOf(3.0d), getMetric("commit-total").metricValue());
        this.metrics.sensor("partition-revoked-latency").record(1.0d);
        this.metrics.sensor("partition-revoked-latency").record(2.0d);
        this.metrics.sensor("partition-assigned-latency").record(1.0d);
        this.metrics.sensor("partition-assigned-latency").record(2.0d);
        this.metrics.sensor("partition-lost-latency").record(1.0d);
        this.metrics.sensor("partition-lost-latency").record(2.0d);
        Assertions.assertEquals(Double.valueOf(1.5d), getMetric("partition-revoked-latency-avg").metricValue());
        Assertions.assertEquals(Double.valueOf(2.0d), getMetric("partition-revoked-latency-max").metricValue());
        Assertions.assertEquals(Double.valueOf(1.5d), getMetric("partition-assigned-latency-avg").metricValue());
        Assertions.assertEquals(Double.valueOf(2.0d), getMetric("partition-assigned-latency-max").metricValue());
        Assertions.assertEquals(Double.valueOf(1.5d), getMetric("partition-lost-latency-avg").metricValue());
        Assertions.assertEquals(Double.valueOf(2.0d), getMetric("partition-lost-latency-max").metricValue());
        Assertions.assertEquals(Double.valueOf(0.0d), getMetric("assigned-partitions").metricValue());
        this.subscriptions.assignFromUser(Collections.singleton(this.t1p));
        Assertions.assertEquals(Double.valueOf(1.0d), getMetric("assigned-partitions").metricValue());
        this.subscriptions.assignFromUser(Utils.mkSet(new TopicPartition[]{this.t1p, this.t2p}));
        Assertions.assertEquals(Double.valueOf(2.0d), getMetric("assigned-partitions").metricValue());
    }

    private KafkaMetric getMetric(String str) {
        return (KafkaMetric) this.metrics.metrics().get(this.metrics.metricName(str, "consumertest-group-coordinator-metrics"));
    }

    @Test
    public void testPerformAssignmentShouldUpdateGroupSubscriptionAfterAssignmentIfNeeded() {
        SubscriptionState subscriptionState = (SubscriptionState) Mockito.mock(SubscriptionState.class);
        Map singletonMap = Collections.singletonMap("consumer", Collections.singletonList("test1"));
        ArrayList arrayList = new ArrayList();
        for (Map.Entry entry : singletonMap.entrySet()) {
            arrayList.add(new JoinGroupResponseData.JoinGroupResponseMember().setMemberId((String) entry.getKey()).setMetadata(ConsumerProtocol.serializeSubscription(new ConsumerPartitionAssignor.Subscription((List) entry.getValue())).array()));
        }
        this.partitionAssignor.prepare(Collections.singletonMap("consumer", Collections.singletonList(this.t1p)));
        ConsumerCoordinator buildCoordinator = buildCoordinator(this.rebalanceConfig, new Metrics(), this.assignors, false, subscriptionState);
        try {
            buildCoordinator.onLeaderElected("1", this.partitionAssignor.name(), arrayList, false);
            ArgumentCaptor forClass = ArgumentCaptor.forClass(Collection.class);
            ((SubscriptionState) Mockito.verify(subscriptionState, Mockito.times(1))).groupSubscribe((Collection) forClass.capture());
            Assertions.assertEquals(new HashSet(Arrays.asList("test1")), forClass.getAllValues().get(0));
            if (buildCoordinator != null) {
                buildCoordinator.close();
            }
            Mockito.clearInvocations(new SubscriptionState[]{subscriptionState});
            this.partitionAssignor.prepare(Collections.singletonMap("consumer", Arrays.asList(this.t1p, this.t2p)));
            buildCoordinator = buildCoordinator(this.rebalanceConfig, new Metrics(), this.assignors, false, subscriptionState);
            try {
                buildCoordinator.onLeaderElected("1", this.partitionAssignor.name(), arrayList, false);
                ArgumentCaptor forClass2 = ArgumentCaptor.forClass(Collection.class);
                ((SubscriptionState) Mockito.verify(subscriptionState, Mockito.times(2))).groupSubscribe((Collection) forClass2.capture());
                Assertions.assertEquals(new HashSet(Arrays.asList("test1", "test2")), forClass2.getAllValues().get(1));
                if (buildCoordinator != null) {
                    buildCoordinator.close();
                }
            } finally {
            }
        } finally {
        }
    }

    public ByteBuffer subscriptionUserData(int i) {
        Schema schema = new Schema(new Field[]{new Field("generation", Type.INT32)});
        Struct struct = new Struct(schema);
        struct.set("generation", Integer.valueOf(i));
        ByteBuffer allocate = ByteBuffer.allocate(schema.sizeOf(struct));
        schema.write(allocate, struct);
        allocate.flip();
        return allocate;
    }

    private List<JoinGroupResponseData.JoinGroupResponseMember> validateCooperativeAssignmentTestSetup() {
        HashMap hashMap = new HashMap();
        List singletonList = Collections.singletonList("test1");
        hashMap.put("consumer", singletonList);
        hashMap.put("consumer2", singletonList);
        ConsumerPartitionAssignor.Subscription subscription = new ConsumerPartitionAssignor.Subscription(singletonList, subscriptionUserData(1), Arrays.asList(this.t1p, this.t2p));
        ConsumerPartitionAssignor.Subscription subscription2 = new ConsumerPartitionAssignor.Subscription(singletonList, subscriptionUserData(1), Collections.emptyList());
        ArrayList arrayList = new ArrayList();
        for (Map.Entry entry : hashMap.entrySet()) {
            arrayList.add(new JoinGroupResponseData.JoinGroupResponseMember().setMemberId((String) entry.getKey()).setMetadata((((String) entry.getKey()).equals("consumer") ? ConsumerProtocol.serializeSubscription(subscription) : ConsumerProtocol.serializeSubscription(subscription2)).array()));
        }
        return arrayList;
    }

    @Test
    public void testPerformAssignmentShouldValidateCooperativeAssignment() {
        SubscriptionState subscriptionState = (SubscriptionState) Mockito.mock(SubscriptionState.class);
        List<JoinGroupResponseData.JoinGroupResponseMember> validateCooperativeAssignmentTestSetup = validateCooperativeAssignmentTestSetup();
        HashMap hashMap = new HashMap();
        hashMap.put("consumer", Arrays.asList(this.t1p));
        hashMap.put("consumer2", Arrays.asList(this.t2p));
        this.partitionAssignor.prepare(hashMap);
        ConsumerCoordinator buildCoordinator = buildCoordinator(this.rebalanceConfig, new Metrics(), this.assignors, false, subscriptionState);
        try {
            if (this.protocol == ConsumerPartitionAssignor.RebalanceProtocol.COOPERATIVE) {
                Assertions.assertTrue(((Exception) Assertions.assertThrows(IllegalStateException.class, () -> {
                    buildCoordinator.onLeaderElected("1", this.partitionAssignor.name(), validateCooperativeAssignmentTestSetup, false);
                })).getMessage().contains("Assignor supporting the COOPERATIVE protocol violates its requirements"));
            } else {
                buildCoordinator.onLeaderElected("1", this.partitionAssignor.name(), validateCooperativeAssignmentTestSetup, false);
            }
            if (buildCoordinator != null) {
                buildCoordinator.close();
            }
        } catch (Throwable th) {
            if (buildCoordinator != null) {
                try {
                    buildCoordinator.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    public void testOnLeaderElectedShouldSkipAssignment() {
        SubscriptionState subscriptionState = (SubscriptionState) Mockito.mock(SubscriptionState.class);
        ConsumerPartitionAssignor consumerPartitionAssignor = (ConsumerPartitionAssignor) Mockito.mock(ConsumerPartitionAssignor.class);
        Mockito.when(consumerPartitionAssignor.name()).thenReturn("mock-assignor");
        Mockito.when(consumerPartitionAssignor.supportedProtocols()).thenReturn(Collections.singletonList(this.protocol));
        Map singletonMap = Collections.singletonMap("consumer", Collections.singletonList("test1"));
        ArrayList arrayList = new ArrayList();
        for (Map.Entry entry : singletonMap.entrySet()) {
            arrayList.add(new JoinGroupResponseData.JoinGroupResponseMember().setMemberId((String) entry.getKey()).setMetadata(ConsumerProtocol.serializeSubscription(new ConsumerPartitionAssignor.Subscription((List) entry.getValue())).array()));
        }
        ConsumerCoordinator buildCoordinator = buildCoordinator(this.rebalanceConfig, new Metrics(), Collections.singletonList(consumerPartitionAssignor), false, subscriptionState);
        try {
            Assertions.assertEquals(Collections.emptyMap(), buildCoordinator.onLeaderElected("1", "mock-assignor", arrayList, true));
            Assertions.assertTrue(buildCoordinator.isLeader());
            if (buildCoordinator != null) {
                buildCoordinator.close();
            }
            ((ConsumerPartitionAssignor) Mockito.verify(consumerPartitionAssignor, Mockito.never())).assign((Cluster) Mockito.any(), (ConsumerPartitionAssignor.GroupSubscription) Mockito.any());
        } catch (Throwable th) {
            if (buildCoordinator != null) {
                try {
                    buildCoordinator.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    public void testPerformAssignmentShouldSkipValidateCooperativeAssignmentForBuiltInCooperativeStickyAssignor() {
        SubscriptionState subscriptionState = (SubscriptionState) Mockito.mock(SubscriptionState.class);
        List<JoinGroupResponseData.JoinGroupResponseMember> validateCooperativeAssignmentTestSetup = validateCooperativeAssignmentTestSetup();
        ArrayList arrayList = new ArrayList(this.assignors);
        MockPartitionAssignor mockPartitionAssignor = new MockPartitionAssignor(Collections.singletonList(this.protocol)) { // from class: org.apache.kafka.clients.consumer.internals.ConsumerCoordinatorTest.2
            @Override // org.apache.kafka.clients.consumer.internals.MockPartitionAssignor
            public String name() {
                return "cooperative-sticky";
            }
        };
        arrayList.add(mockPartitionAssignor);
        HashMap hashMap = new HashMap();
        hashMap.put("consumer", Arrays.asList(this.t1p));
        hashMap.put("consumer2", Arrays.asList(this.t2p));
        mockPartitionAssignor.prepare(hashMap);
        ConsumerCoordinator buildCoordinator = buildCoordinator(this.rebalanceConfig, new Metrics(), arrayList, false, subscriptionState);
        try {
            buildCoordinator.onLeaderElected("1", mockPartitionAssignor.name(), validateCooperativeAssignmentTestSetup, false);
            if (buildCoordinator != null) {
                buildCoordinator.close();
            }
        } catch (Throwable th) {
            if (buildCoordinator != null) {
                try {
                    buildCoordinator.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    public void testSelectRebalanceProtcol() {
        ArrayList arrayList = new ArrayList();
        arrayList.add(new MockPartitionAssignor(Collections.singletonList(ConsumerPartitionAssignor.RebalanceProtocol.EAGER)));
        arrayList.add(new MockPartitionAssignor(Collections.singletonList(ConsumerPartitionAssignor.RebalanceProtocol.COOPERATIVE)));
        Assertions.assertThrows(IllegalArgumentException.class, () -> {
            buildCoordinator(this.rebalanceConfig, new Metrics(), arrayList, false, this.subscriptions);
        });
        arrayList.clear();
        arrayList.add(new MockPartitionAssignor(Arrays.asList(ConsumerPartitionAssignor.RebalanceProtocol.EAGER, ConsumerPartitionAssignor.RebalanceProtocol.COOPERATIVE)));
        arrayList.add(new MockPartitionAssignor(Arrays.asList(ConsumerPartitionAssignor.RebalanceProtocol.EAGER, ConsumerPartitionAssignor.RebalanceProtocol.COOPERATIVE)));
        ConsumerCoordinator buildCoordinator = buildCoordinator(this.rebalanceConfig, new Metrics(), arrayList, false, this.subscriptions);
        try {
            Assertions.assertEquals(ConsumerPartitionAssignor.RebalanceProtocol.COOPERATIVE, buildCoordinator.getProtocol());
            if (buildCoordinator != null) {
                buildCoordinator.close();
            }
        } catch (Throwable th) {
            if (buildCoordinator != null) {
                try {
                    buildCoordinator.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    public void testNormalHeartbeat() {
        this.client.prepareResponse(groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady(this.time.timer(Long.MAX_VALUE));
        this.time.sleep(10000L);
        RequestFuture sendHeartbeatRequest = this.coordinator.sendHeartbeatRequest();
        Assertions.assertEquals(1, this.consumerClient.pendingRequestCount());
        Assertions.assertFalse(sendHeartbeatRequest.isDone());
        this.client.prepareResponse(heartbeatResponse(Errors.NONE));
        this.consumerClient.poll(this.time.timer(0L));
        Assertions.assertTrue(sendHeartbeatRequest.isDone());
        Assertions.assertTrue(sendHeartbeatRequest.succeeded());
    }

    @Test
    public void testGroupDescribeUnauthorized() {
        this.client.prepareResponse(groupCoordinatorResponse(this.node, Errors.GROUP_AUTHORIZATION_FAILED));
        Assertions.assertThrows(GroupAuthorizationException.class, () -> {
            this.coordinator.ensureCoordinatorReady(this.time.timer(Long.MAX_VALUE));
        });
    }

    @Test
    public void testGroupReadUnauthorized() {
        this.subscriptions.subscribe(Collections.singleton("test1"), this.rebalanceListener);
        this.client.prepareResponse(groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady(this.time.timer(Long.MAX_VALUE));
        this.client.prepareResponse(joinGroupLeaderResponse(0, "memberId", Collections.emptyMap(), Errors.GROUP_AUTHORIZATION_FAILED));
        Assertions.assertThrows(GroupAuthorizationException.class, () -> {
            this.coordinator.poll(this.time.timer(Long.MAX_VALUE));
        });
    }

    @Test
    public void testCoordinatorNotAvailableWithUserAssignedType() {
        this.subscriptions.assignFromUser(Collections.singleton(this.t1p));
        this.client.prepareResponse(groupCoordinatorResponse(this.node, Errors.COORDINATOR_NOT_AVAILABLE));
        this.coordinator.poll(this.time.timer(0L));
        Assertions.assertTrue(this.coordinator.coordinatorUnknown());
        this.client.prepareResponse(groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.poll(this.time.timer(Long.MAX_VALUE));
        Assertions.assertTrue(this.coordinator.coordinatorUnknown());
    }

    @Test
    public void testAutoCommitAsyncWithUserAssignedType() {
        ConsumerCoordinator buildCoordinator = buildCoordinator(this.rebalanceConfig, new Metrics(), this.assignors, true, this.subscriptions);
        try {
            this.subscriptions.assignFromUser(Collections.singleton(this.t1p));
            buildCoordinator.poll(this.time.timer(0L));
            Assertions.assertTrue(buildCoordinator.coordinatorUnknown());
            Assertions.assertFalse(this.client.hasInFlightRequests());
            this.time.sleep(2000L);
            this.subscriptions.seekUnvalidated(this.t1p, new SubscriptionState.FetchPosition(100L));
            buildCoordinator.poll(this.time.timer(0L));
            Assertions.assertTrue(buildCoordinator.coordinatorUnknown());
            Assertions.assertTrue(this.client.hasInFlightRequests());
            this.client.respond(groupCoordinatorResponse(this.node, Errors.NONE));
            buildCoordinator.poll(this.time.timer(0L));
            Assertions.assertFalse(buildCoordinator.coordinatorUnknown());
            Assertions.assertTrue(this.client.hasInFlightRequests());
            if (buildCoordinator != null) {
                buildCoordinator.close();
            }
        } catch (Throwable th) {
            if (buildCoordinator != null) {
                try {
                    buildCoordinator.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    public void testCommitAsyncWithUserAssignedType() {
        this.subscriptions.assignFromUser(Collections.singleton(this.t1p));
        this.coordinator.poll(this.time.timer(0L));
        Assertions.assertTrue(this.coordinator.coordinatorUnknown());
        Assertions.assertFalse(this.client.hasInFlightRequests());
        this.coordinator.commitOffsetsAsync(Collections.singletonMap(this.t1p, new OffsetAndMetadata(100L)), (map, exc) -> {
            Assertions.fail("Commit should not get responses, but got offsets:" + map + ", and exception:" + exc);
        });
        this.coordinator.poll(this.time.timer(0L));
        Assertions.assertTrue(this.coordinator.coordinatorUnknown());
        Assertions.assertTrue(this.client.hasInFlightRequests());
        this.client.respond(groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.poll(this.time.timer(0L));
        Assertions.assertFalse(this.coordinator.coordinatorUnknown());
        Assertions.assertTrue(this.client.hasInFlightRequests());
    }

    @Test
    public void testCoordinatorNotAvailable() {
        this.client.prepareResponse(groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady(this.time.timer(Long.MAX_VALUE));
        this.time.sleep(10000L);
        RequestFuture sendHeartbeatRequest = this.coordinator.sendHeartbeatRequest();
        Assertions.assertEquals(1, this.consumerClient.pendingRequestCount());
        Assertions.assertFalse(sendHeartbeatRequest.isDone());
        this.client.prepareResponse(heartbeatResponse(Errors.COORDINATOR_NOT_AVAILABLE));
        this.time.sleep(10000L);
        this.consumerClient.poll(this.time.timer(0L));
        Assertions.assertTrue(sendHeartbeatRequest.isDone());
        Assertions.assertTrue(sendHeartbeatRequest.failed());
        Assertions.assertEquals(Errors.COORDINATOR_NOT_AVAILABLE.exception(), sendHeartbeatRequest.exception());
        Assertions.assertTrue(this.coordinator.coordinatorUnknown());
    }

    @Test
    public void testManyInFlightAsyncCommitsWithCoordinatorDisconnect() {
        this.client.prepareResponse(groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady(this.time.timer(Long.MAX_VALUE));
        TopicPartition topicPartition = new TopicPartition("foo", 0);
        AtomicInteger atomicInteger = new AtomicInteger(0);
        for (int i = 0; i < 1000; i++) {
            this.coordinator.commitOffsetsAsync(Collections.singletonMap(topicPartition, new OffsetAndMetadata(i)), (map, exc) -> {
                atomicInteger.incrementAndGet();
                Throwable cause = exc.getCause();
                Assertions.assertTrue(cause instanceof DisconnectException, "Unexpected exception cause type: " + (cause == null ? null : cause.getClass()));
            });
        }
        this.coordinator.markCoordinatorUnknown("test cause");
        this.consumerClient.pollNoWakeup();
        this.coordinator.invokeCompletedOffsetCommitCallbacks();
        Assertions.assertEquals(1000, atomicInteger.get());
    }

    @Test
    public void testCoordinatorUnknownInUnsentCallbacksAfterCoordinatorDead() {
        this.client.prepareResponse(groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady(this.time.timer(Long.MAX_VALUE));
        final AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        this.consumerClient.send(this.coordinator.checkAndGetCoordinator(), new OffsetCommitRequest.Builder(new OffsetCommitRequestData().setGroupId("test-group").setTopics(Collections.singletonList(new OffsetCommitRequestData.OffsetCommitRequestTopic().setName("foo").setPartitions(Collections.singletonList(new OffsetCommitRequestData.OffsetCommitRequestPartition().setPartitionIndex(0).setCommittedLeaderEpoch(-1).setCommittedMetadata("").setCommittedOffset(13L).setCommitTimestamp(0L))))))).compose(new RequestFutureAdapter<ClientResponse, Object>() { // from class: org.apache.kafka.clients.consumer.internals.ConsumerCoordinatorTest.3
            public void onSuccess(ClientResponse clientResponse, RequestFuture<Object> requestFuture) {
            }

            public void onFailure(RuntimeException runtimeException, RequestFuture<Object> requestFuture) {
                Assertions.assertTrue(runtimeException instanceof DisconnectException, "Unexpected exception type: " + runtimeException.getClass());
                Assertions.assertTrue(ConsumerCoordinatorTest.this.coordinator.coordinatorUnknown());
                atomicBoolean.set(true);
            }

            public /* bridge */ /* synthetic */ void onSuccess(Object obj, RequestFuture requestFuture) {
                onSuccess((ClientResponse) obj, (RequestFuture<Object>) requestFuture);
            }
        });
        this.coordinator.markCoordinatorUnknown("test cause");
        this.consumerClient.pollNoWakeup();
        Assertions.assertTrue(atomicBoolean.get());
    }

    @Test
    public void testNotCoordinator() {
        this.client.prepareResponse(groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady(this.time.timer(Long.MAX_VALUE));
        this.time.sleep(10000L);
        RequestFuture sendHeartbeatRequest = this.coordinator.sendHeartbeatRequest();
        Assertions.assertEquals(1, this.consumerClient.pendingRequestCount());
        Assertions.assertFalse(sendHeartbeatRequest.isDone());
        this.client.prepareResponse(heartbeatResponse(Errors.NOT_COORDINATOR));
        this.time.sleep(10000L);
        this.consumerClient.poll(this.time.timer(0L));
        Assertions.assertTrue(sendHeartbeatRequest.isDone());
        Assertions.assertTrue(sendHeartbeatRequest.failed());
        Assertions.assertEquals(Errors.NOT_COORDINATOR.exception(), sendHeartbeatRequest.exception());
        Assertions.assertTrue(this.coordinator.coordinatorUnknown());
    }

    @Test
    public void testIllegalGeneration() {
        this.client.prepareResponse(groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady(this.time.timer(Long.MAX_VALUE));
        this.subscriptions.subscribe(Collections.singleton("test1"), this.rebalanceListener);
        this.subscriptions.assignFromSubscribed(Collections.singletonList(this.t1p));
        this.time.sleep(10000L);
        RequestFuture sendHeartbeatRequest = this.coordinator.sendHeartbeatRequest();
        Assertions.assertEquals(1, this.consumerClient.pendingRequestCount());
        Assertions.assertFalse(sendHeartbeatRequest.isDone());
        this.client.prepareResponse(heartbeatResponse(Errors.ILLEGAL_GENERATION));
        this.time.sleep(10000L);
        this.consumerClient.poll(this.time.timer(0L));
        Assertions.assertTrue(sendHeartbeatRequest.isDone());
        Assertions.assertTrue(sendHeartbeatRequest.failed());
        Assertions.assertEquals(Errors.ILLEGAL_GENERATION.exception(), sendHeartbeatRequest.exception());
        Assertions.assertTrue(this.coordinator.rejoinNeededOrPending());
        this.coordinator.poll(this.time.timer(0L));
        Assertions.assertEquals(1, this.rebalanceListener.lostCount);
        Assertions.assertEquals(Collections.singleton(this.t1p), this.rebalanceListener.lost);
    }

    @Test
    public void testUnsubscribeWithValidGeneration() {
        this.client.prepareResponse(groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady(this.time.timer(Long.MAX_VALUE));
        this.subscriptions.subscribe(Collections.singleton("test1"), this.rebalanceListener);
        this.coordinator.onJoinComplete(1, "memberId", this.partitionAssignor.name(), ConsumerProtocol.serializeAssignment(new ConsumerPartitionAssignor.Assignment(Collections.singletonList(this.t1p), ByteBuffer.wrap(new byte[0]))));
        this.coordinator.onLeavePrepare();
        Assertions.assertEquals(1, this.rebalanceListener.lostCount);
        Assertions.assertEquals(0, this.rebalanceListener.revokedCount);
    }

    @Test
    public void testRevokeExceptionThrownFirstNonBlockingSubCallbacks() {
        MockRebalanceListener mockRebalanceListener = new MockRebalanceListener() { // from class: org.apache.kafka.clients.consumer.internals.ConsumerCoordinatorTest.4
            @Override // org.apache.kafka.clients.consumer.internals.MockRebalanceListener
            public void onPartitionsRevoked(Collection<TopicPartition> collection) {
                super.onPartitionsRevoked(collection);
                throw new KafkaException("Kaboom on revoke!");
            }
        };
        if (this.protocol == ConsumerPartitionAssignor.RebalanceProtocol.COOPERATIVE) {
            verifyOnCallbackExceptions(mockRebalanceListener, this.throwOnAssignmentAssignor.name(), "Kaboom on revoke!", null);
        } else {
            verifyOnCallbackExceptions(mockRebalanceListener, this.throwOnAssignmentAssignor.name(), "Kaboom for assignment!", null);
        }
    }

    @Test
    public void testOnAssignmentExceptionThrownFirstNonBlockingSubCallbacks() {
        verifyOnCallbackExceptions(new MockRebalanceListener() { // from class: org.apache.kafka.clients.consumer.internals.ConsumerCoordinatorTest.5
            @Override // org.apache.kafka.clients.consumer.internals.MockRebalanceListener
            public void onPartitionsAssigned(Collection<TopicPartition> collection) {
                super.onPartitionsAssigned(collection);
                throw new KafkaException("Kaboom on partition assign!");
            }
        }, this.throwOnAssignmentAssignor.name(), "Kaboom for assignment!", null);
    }

    @Test
    public void testOnPartitionsAssignExceptionThrownWhenNoPreviousThrownCallbacks() {
        verifyOnCallbackExceptions(new MockRebalanceListener() { // from class: org.apache.kafka.clients.consumer.internals.ConsumerCoordinatorTest.6
            @Override // org.apache.kafka.clients.consumer.internals.MockRebalanceListener
            public void onPartitionsAssigned(Collection<TopicPartition> collection) {
                super.onPartitionsAssigned(collection);
                throw new KafkaException("Kaboom on partition assign!");
            }
        }, this.partitionAssignor.name(), "Kaboom on partition assign!", null);
    }

    @Test
    public void testOnRevokeExceptionShouldBeRenderedIfNotKafkaException() {
        MockRebalanceListener mockRebalanceListener = new MockRebalanceListener() { // from class: org.apache.kafka.clients.consumer.internals.ConsumerCoordinatorTest.7
            @Override // org.apache.kafka.clients.consumer.internals.MockRebalanceListener
            public void onPartitionsRevoked(Collection<TopicPartition> collection) {
                super.onPartitionsRevoked(collection);
                throw new IllegalStateException("Illegal state on partition revoke!");
            }
        };
        if (this.protocol == ConsumerPartitionAssignor.RebalanceProtocol.COOPERATIVE) {
            verifyOnCallbackExceptions(mockRebalanceListener, this.throwOnAssignmentAssignor.name(), "User rebalance callback throws an error", "Illegal state on partition revoke!");
        } else {
            verifyOnCallbackExceptions(mockRebalanceListener, this.throwOnAssignmentAssignor.name(), "Kaboom for assignment!", null);
        }
    }

    @Test
    public void testOnAssignmentExceptionShouldBeRenderedIfNotKafkaException() {
        verifyOnCallbackExceptions(new MockRebalanceListener() { // from class: org.apache.kafka.clients.consumer.internals.ConsumerCoordinatorTest.8
            @Override // org.apache.kafka.clients.consumer.internals.MockRebalanceListener
            public void onPartitionsAssigned(Collection<TopicPartition> collection) {
                super.onPartitionsAssigned(collection);
                throw new KafkaException("Kaboom on partition assign!");
            }
        }, this.throwFatalErrorOnAssignmentAssignor.name(), "User rebalance callback throws an error", "Illegal state for assignment!");
    }

    @Test
    public void testOnPartitionsAssignExceptionShouldBeRenderedIfNotKafkaException() {
        verifyOnCallbackExceptions(new MockRebalanceListener() { // from class: org.apache.kafka.clients.consumer.internals.ConsumerCoordinatorTest.9
            @Override // org.apache.kafka.clients.consumer.internals.MockRebalanceListener
            public void onPartitionsAssigned(Collection<TopicPartition> collection) {
                super.onPartitionsAssigned(collection);
                throw new IllegalStateException("Illegal state on partition assign!");
            }
        }, this.partitionAssignor.name(), "User rebalance callback throws an error", "Illegal state on partition assign!");
    }

    private void verifyOnCallbackExceptions(MockRebalanceListener mockRebalanceListener, String str, String str2, String str3) {
        this.client.prepareResponse(groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady(this.time.timer(Long.MAX_VALUE));
        this.subscriptions.subscribe(Collections.singleton("test1"), mockRebalanceListener);
        ByteBuffer serializeAssignment = ConsumerProtocol.serializeAssignment(new ConsumerPartitionAssignor.Assignment(Collections.singletonList(this.t1p), ByteBuffer.wrap(new byte[0])));
        this.subscriptions.assignFromSubscribed(Collections.singleton(this.t2p));
        if (str2 != null) {
            Exception exc = (Exception) Assertions.assertThrows(KafkaException.class, () -> {
                this.coordinator.onJoinComplete(1, "memberId", str, serializeAssignment);
            });
            Assertions.assertEquals(str2, exc.getMessage());
            if (str3 != null) {
                Assertions.assertEquals(str3, exc.getCause().getMessage());
            }
        }
        Assertions.assertEquals(this.protocol == ConsumerPartitionAssignor.RebalanceProtocol.COOPERATIVE ? 1 : 0, mockRebalanceListener.revokedCount);
        Assertions.assertEquals(0, mockRebalanceListener.lostCount);
        Assertions.assertEquals(1, mockRebalanceListener.assignedCount);
        Assertions.assertTrue(this.assignorMap.containsKey(str), "Unknown assignor name: " + str);
        Assertions.assertEquals(1, this.assignorMap.get(str).numAssignment());
    }

    @Test
    public void testUnsubscribeWithInvalidGeneration() {
        this.client.prepareResponse(groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady(this.time.timer(Long.MAX_VALUE));
        this.subscriptions.subscribe(Collections.singleton("test1"), this.rebalanceListener);
        this.subscriptions.assignFromSubscribed(Collections.singletonList(this.t1p));
        this.coordinator.onLeavePrepare();
        Assertions.assertEquals(1, this.rebalanceListener.lostCount);
        Assertions.assertEquals(0, this.rebalanceListener.revokedCount);
    }

    @Test
    public void testUnknownMemberId() {
        this.client.prepareResponse(groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady(this.time.timer(Long.MAX_VALUE));
        this.subscriptions.subscribe(Collections.singleton("test1"), this.rebalanceListener);
        this.subscriptions.assignFromSubscribed(Collections.singletonList(this.t1p));
        this.time.sleep(10000L);
        RequestFuture sendHeartbeatRequest = this.coordinator.sendHeartbeatRequest();
        Assertions.assertEquals(1, this.consumerClient.pendingRequestCount());
        Assertions.assertFalse(sendHeartbeatRequest.isDone());
        this.client.prepareResponse(heartbeatResponse(Errors.UNKNOWN_MEMBER_ID));
        this.time.sleep(10000L);
        this.consumerClient.poll(this.time.timer(0L));
        Assertions.assertTrue(sendHeartbeatRequest.isDone());
        Assertions.assertTrue(sendHeartbeatRequest.failed());
        Assertions.assertEquals(Errors.UNKNOWN_MEMBER_ID.exception(), sendHeartbeatRequest.exception());
        Assertions.assertTrue(this.coordinator.rejoinNeededOrPending());
        this.coordinator.poll(this.time.timer(0L));
        Assertions.assertEquals(1, this.rebalanceListener.lostCount);
        Assertions.assertEquals(Collections.singleton(this.t1p), this.rebalanceListener.lost);
    }

    @Test
    public void testCoordinatorDisconnect() {
        this.client.prepareResponse(groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady(this.time.timer(Long.MAX_VALUE));
        this.time.sleep(10000L);
        RequestFuture sendHeartbeatRequest = this.coordinator.sendHeartbeatRequest();
        Assertions.assertEquals(1, this.consumerClient.pendingRequestCount());
        Assertions.assertFalse(sendHeartbeatRequest.isDone());
        this.client.prepareResponse((AbstractResponse) heartbeatResponse(Errors.NONE), true);
        this.time.sleep(10000L);
        this.consumerClient.poll(this.time.timer(0L));
        Assertions.assertTrue(sendHeartbeatRequest.isDone());
        Assertions.assertTrue(sendHeartbeatRequest.failed());
        Assertions.assertTrue(sendHeartbeatRequest.exception() instanceof DisconnectException);
        Assertions.assertTrue(this.coordinator.coordinatorUnknown());
    }

    @Test
    public void testJoinGroupInvalidGroupId() {
        this.subscriptions.subscribe(Collections.singleton("test1"), this.rebalanceListener);
        this.client.updateMetadata(this.metadataResponse);
        this.client.prepareResponse(groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady(this.time.timer(Long.MAX_VALUE));
        this.client.prepareResponse(joinGroupLeaderResponse(0, "leader", Collections.emptyMap(), Errors.INVALID_GROUP_ID));
        Assertions.assertThrows(ApiException.class, () -> {
            this.coordinator.poll(this.time.timer(Long.MAX_VALUE));
        });
    }

    @Test
    public void testNormalJoinGroupLeader() {
        Set singleton = Collections.singleton("test1");
        List<TopicPartition> emptyList = Collections.emptyList();
        List<TopicPartition> asList = Arrays.asList(this.t1p);
        this.subscriptions.subscribe(Collections.singleton("test1"), this.rebalanceListener);
        this.client.updateMetadata(this.metadataResponse);
        this.client.prepareResponse(groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady(this.time.timer(Long.MAX_VALUE));
        Map<String, List<String>> singletonMap = Collections.singletonMap("leader", Collections.singletonList("test1"));
        this.partitionAssignor.prepare(Collections.singletonMap("leader", asList));
        this.client.prepareResponse(joinGroupLeaderResponse(1, "leader", singletonMap, Errors.NONE));
        this.client.prepareResponse(abstractRequest -> {
            SyncGroupRequest syncGroupRequest = (SyncGroupRequest) abstractRequest;
            return syncGroupRequest.data().memberId().equals("leader") && syncGroupRequest.data().generationId() == 1 && syncGroupRequest.groupAssignments().containsKey("leader");
        }, (AbstractResponse) syncGroupResponse(asList, Errors.NONE));
        this.coordinator.poll(this.time.timer(Long.MAX_VALUE));
        Assertions.assertFalse(this.coordinator.rejoinNeededOrPending());
        Assertions.assertEquals(TestUtils.toSet(asList), this.subscriptions.assignedPartitions());
        Assertions.assertEquals(singleton, this.subscriptions.metadataTopics());
        Assertions.assertEquals(0, this.rebalanceListener.revokedCount);
        Assertions.assertNull(this.rebalanceListener.revoked);
        Assertions.assertEquals(1, this.rebalanceListener.assignedCount);
        Assertions.assertEquals(getAdded(emptyList, asList), this.rebalanceListener.assigned);
    }

    @Test
    public void testOutdatedCoordinatorAssignment() {
        List<TopicPartition> emptyList = Collections.emptyList();
        List singletonList = Collections.singletonList("test2");
        List<TopicPartition> asList = Arrays.asList(this.t2p);
        List singletonList2 = Collections.singletonList("test1");
        List<TopicPartition> asList2 = Arrays.asList(this.t1p);
        this.subscriptions.subscribe(TestUtils.toSet(singletonList), this.rebalanceListener);
        this.client.prepareResponse(groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady(this.time.timer(Long.MAX_VALUE));
        this.partitionAssignor.prepare(Collections.singletonMap("outdated_assignment", asList2));
        this.client.prepareResponse(joinGroupLeaderResponse(1, "outdated_assignment", Collections.singletonMap("outdated_assignment", singletonList), Errors.NONE));
        this.client.prepareResponse(abstractRequest -> {
            SyncGroupRequest syncGroupRequest = (SyncGroupRequest) abstractRequest;
            return syncGroupRequest.data().memberId().equals("outdated_assignment") && syncGroupRequest.data().generationId() == 1 && syncGroupRequest.groupAssignments().containsKey("outdated_assignment");
        }, (AbstractResponse) syncGroupResponse(asList, Errors.NONE));
        this.client.prepareResponse(joinGroupLeaderResponse(1, "outdated_assignment", Collections.singletonMap("outdated_assignment", singletonList2), Errors.NONE));
        this.client.prepareResponse(abstractRequest2 -> {
            SyncGroupRequest syncGroupRequest = (SyncGroupRequest) abstractRequest2;
            return syncGroupRequest.data().memberId().equals("outdated_assignment") && syncGroupRequest.data().generationId() == 1 && syncGroupRequest.groupAssignments().containsKey("outdated_assignment");
        }, (AbstractResponse) syncGroupResponse(asList2, Errors.NONE));
        this.coordinator.poll(this.time.timer(0L));
        this.subscriptions.subscribe(TestUtils.toSet(singletonList2), this.rebalanceListener);
        this.coordinator.poll(this.time.timer(0L));
        this.coordinator.poll(this.time.timer(Long.MAX_VALUE));
        Collection<TopicPartition> added = getAdded(emptyList, asList2);
        Assertions.assertFalse(this.coordinator.rejoinNeededOrPending());
        Assertions.assertEquals(TestUtils.toSet(asList2), this.subscriptions.assignedPartitions());
        Assertions.assertEquals(TestUtils.toSet(singletonList2), this.subscriptions.metadataTopics());
        Assertions.assertEquals(this.protocol == ConsumerPartitionAssignor.RebalanceProtocol.EAGER ? 1 : 0, this.rebalanceListener.revokedCount);
        Assertions.assertEquals(1, this.rebalanceListener.assignedCount);
        Assertions.assertEquals(added, this.rebalanceListener.assigned);
    }

    @Test
    public void testMetadataTopicsDuringSubscriptionChange() {
        List<String> singletonList = Collections.singletonList("test1");
        List<TopicPartition> singletonList2 = Collections.singletonList(this.t1p);
        List<String> singletonList3 = Collections.singletonList("test2");
        List<TopicPartition> singletonList4 = Collections.singletonList(this.t2p);
        this.subscriptions.subscribe(TestUtils.toSet(singletonList), this.rebalanceListener);
        Assertions.assertEquals(TestUtils.toSet(singletonList), this.subscriptions.metadataTopics());
        this.client.prepareResponse(groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady(this.time.timer(Long.MAX_VALUE));
        prepareJoinAndSyncResponse("subscription_change", 1, singletonList, singletonList2);
        this.coordinator.poll(this.time.timer(0L));
        Assertions.assertEquals(TestUtils.toSet(singletonList), this.subscriptions.metadataTopics());
        this.subscriptions.subscribe(TestUtils.toSet(singletonList3), this.rebalanceListener);
        Assertions.assertEquals(Utils.mkSet(new String[]{"test1", "test2"}), this.subscriptions.metadataTopics());
        prepareJoinAndSyncResponse("subscription_change", 2, singletonList3, singletonList4);
        this.coordinator.poll(this.time.timer(Long.MAX_VALUE));
        Assertions.assertFalse(this.coordinator.rejoinNeededOrPending());
        Assertions.assertEquals(TestUtils.toSet(singletonList4), this.subscriptions.assignedPartitions());
        Assertions.assertEquals(TestUtils.toSet(singletonList3), this.subscriptions.metadataTopics());
    }

    @Test
    public void testPatternJoinGroupLeader() {
        List<TopicPartition> asList = Arrays.asList(this.t1p, this.t2p);
        List<TopicPartition> emptyList = Collections.emptyList();
        this.subscriptions.subscribe(Pattern.compile("test.*"), this.rebalanceListener);
        this.client.updateMetadata(RequestTestUtils.metadataUpdateWith(1, Collections.singletonMap("test1", 1)));
        this.client.prepareResponse(groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady(this.time.timer(Long.MAX_VALUE));
        Map<String, List<String>> singletonMap = Collections.singletonMap("leader", Collections.singletonList("test1"));
        this.partitionAssignor.prepare(Collections.singletonMap("leader", asList));
        this.client.prepareResponse(joinGroupLeaderResponse(1, "leader", singletonMap, Errors.NONE));
        this.client.prepareResponse(abstractRequest -> {
            SyncGroupRequest syncGroupRequest = (SyncGroupRequest) abstractRequest;
            return syncGroupRequest.data().memberId().equals("leader") && syncGroupRequest.data().generationId() == 1 && syncGroupRequest.groupAssignments().containsKey("leader");
        }, (AbstractResponse) syncGroupResponse(asList, Errors.NONE));
        this.client.prepareMetadataUpdate(this.metadataResponse);
        this.coordinator.poll(this.time.timer(Long.MAX_VALUE));
        Assertions.assertFalse(this.coordinator.rejoinNeededOrPending());
        Assertions.assertEquals(2, this.subscriptions.numAssignedPartitions());
        Assertions.assertEquals(2, this.subscriptions.metadataTopics().size());
        Assertions.assertEquals(2, this.subscriptions.subscription().size());
        Assertions.assertEquals(0, this.rebalanceListener.revokedCount);
        Assertions.assertNull(this.rebalanceListener.revoked);
        Assertions.assertEquals(1, this.rebalanceListener.assignedCount);
        Assertions.assertEquals(getAdded(emptyList, asList), this.rebalanceListener.assigned);
    }

    @Test
    public void testMetadataRefreshDuringRebalance() {
        List<TopicPartition> emptyList = Collections.emptyList();
        List<TopicPartition> singletonList = Collections.singletonList(this.t1p);
        this.subscriptions.subscribe(Pattern.compile(".*"), this.rebalanceListener);
        this.client.updateMetadata(RequestTestUtils.metadataUpdateWith(1, Collections.singletonMap("test1", 1)));
        this.coordinator.maybeUpdateSubscriptionMetadata();
        Assertions.assertEquals(Collections.singleton("test1"), this.subscriptions.subscription());
        this.client.prepareResponse(groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady(this.time.timer(Long.MAX_VALUE));
        Map<String, List<String>> singletonMap = Collections.singletonMap("leader", Collections.singletonList("test1"));
        this.partitionAssignor.prepare(Collections.singletonMap("leader", singletonList));
        List asList = Arrays.asList("test1", "test2");
        this.client.prepareResponse(joinGroupLeaderResponse(1, "leader", singletonMap, Errors.NONE));
        this.client.prepareResponse(abstractRequest -> {
            HashMap hashMap = new HashMap();
            Iterator it = asList.iterator();
            while (it.hasNext()) {
                hashMap.put((String) it.next(), 1);
            }
            this.client.updateMetadata(RequestTestUtils.metadataUpdateWith(1, hashMap));
            return true;
        }, (AbstractResponse) syncGroupResponse(singletonList, Errors.NONE));
        this.coordinator.poll(this.time.timer(Long.MAX_VALUE));
        Assertions.assertFalse(this.coordinator.rejoinNeededOrPending());
        Assertions.assertEquals(Collections.singleton("test1"), this.subscriptions.subscription());
        Assertions.assertEquals(TestUtils.toSet(singletonList), this.subscriptions.assignedPartitions());
        Assertions.assertEquals(0, this.rebalanceListener.revokedCount);
        Assertions.assertNull(this.rebalanceListener.revoked);
        Assertions.assertEquals(1, this.rebalanceListener.assignedCount);
        Assertions.assertEquals(getAdded(emptyList, singletonList), this.rebalanceListener.assigned);
        List<TopicPartition> asList2 = Arrays.asList(this.t1p, this.t2p);
        Map<String, List<String>> singletonMap2 = Collections.singletonMap("leader", Arrays.asList("test1", "test2"));
        this.partitionAssignor.prepare(Collections.singletonMap("leader", asList2));
        this.client.prepareResponse(abstractRequest2 -> {
            Iterator it = ((JoinGroupRequest) abstractRequest2).data().protocols().iterator();
            Assertions.assertTrue(it.hasNext());
            ByteBuffer wrap = ByteBuffer.wrap(((JoinGroupRequestData.JoinGroupRequestProtocol) it.next()).metadata());
            ConsumerPartitionAssignor.Subscription deserializeSubscription = ConsumerProtocol.deserializeSubscription(wrap);
            wrap.rewind();
            return deserializeSubscription.topics().containsAll(asList);
        }, (AbstractResponse) joinGroupLeaderResponse(2, "leader", singletonMap2, Errors.NONE));
        this.client.prepareResponse(abstractRequest3 -> {
            this.client.updateMetadata(RequestTestUtils.metadataUpdateWith(1, Collections.singletonMap("test1", 1)));
            return true;
        }, (AbstractResponse) syncGroupResponse(asList2, Errors.NONE));
        this.coordinator.poll(this.time.timer(Long.MAX_VALUE));
        Collection<TopicPartition> revoked = getRevoked(singletonList, asList2);
        int i = revoked.isEmpty() ? 0 : 1;
        Assertions.assertFalse(this.coordinator.rejoinNeededOrPending());
        Assertions.assertEquals(TestUtils.toSet(asList), this.subscriptions.subscription());
        Assertions.assertEquals(TestUtils.toSet(asList2), this.subscriptions.assignedPartitions());
        Assertions.assertEquals(i, this.rebalanceListener.revokedCount);
        Assertions.assertEquals(revoked.isEmpty() ? null : revoked, this.rebalanceListener.revoked);
        Assertions.assertEquals(2, this.rebalanceListener.assignedCount);
        Assertions.assertEquals(getAdded(singletonList, asList2), this.rebalanceListener.assigned);
        this.partitionAssignor.prepare(Collections.singletonMap("leader", singletonList));
        this.client.prepareResponse(abstractRequest4 -> {
            Iterator it = ((JoinGroupRequest) abstractRequest4).data().protocols().iterator();
            Assertions.assertTrue(it.hasNext());
            ByteBuffer wrap = ByteBuffer.wrap(((JoinGroupRequestData.JoinGroupRequestProtocol) it.next()).metadata());
            ConsumerPartitionAssignor.Subscription deserializeSubscription = ConsumerProtocol.deserializeSubscription(wrap);
            wrap.rewind();
            return deserializeSubscription.topics().contains("test1");
        }, (AbstractResponse) joinGroupLeaderResponse(3, "leader", singletonMap, Errors.NONE));
        this.client.prepareResponse(syncGroupResponse(singletonList, Errors.NONE));
        this.coordinator.poll(this.time.timer(Long.MAX_VALUE));
        Collection<TopicPartition> revoked2 = getRevoked(asList2, singletonList);
        Assertions.assertFalse(revoked2.isEmpty());
        Collection<TopicPartition> added = getAdded(asList2, singletonList);
        Assertions.assertFalse(this.coordinator.rejoinNeededOrPending());
        Assertions.assertEquals(Collections.singleton("test1"), this.subscriptions.subscription());
        Assertions.assertEquals(TestUtils.toSet(singletonList), this.subscriptions.assignedPartitions());
        Assertions.assertEquals(i + 1, this.rebalanceListener.revokedCount);
        Assertions.assertEquals(revoked2.isEmpty() ? null : revoked2, this.rebalanceListener.revoked);
        Assertions.assertEquals(3, this.rebalanceListener.assignedCount);
        Assertions.assertEquals(added, this.rebalanceListener.assigned);
        Assertions.assertEquals(0, this.rebalanceListener.lostCount);
    }

    @Test
    public void testForceMetadataRefreshForPatternSubscriptionDuringRebalance() {
        this.subscriptions.subscribe(Pattern.compile(".*"), this.rebalanceListener);
        this.client.updateMetadata(RequestTestUtils.metadataUpdateWith(1, Collections.singletonMap("test1", 1)));
        this.coordinator.maybeUpdateSubscriptionMetadata();
        Assertions.assertEquals(Collections.singleton("test1"), this.subscriptions.subscription());
        this.client.prepareResponse(groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady(this.time.timer(Long.MAX_VALUE));
        this.client.prepareMetadataUpdate(this.metadataResponse);
        this.client.prepareResponse(joinGroupFollowerResponse(1, "consumer", "leader", Errors.NONE));
        this.client.prepareResponse(abstractRequest -> {
            SyncGroupRequest syncGroupRequest = (SyncGroupRequest) abstractRequest;
            return syncGroupRequest.data().memberId().equals("consumer") && syncGroupRequest.data().generationId() == 1 && syncGroupRequest.groupAssignments().isEmpty();
        }, (AbstractResponse) syncGroupResponse(Collections.singletonList(this.t1p), Errors.NONE));
        this.partitionAssignor.prepare(Collections.singletonMap("consumer", Collections.singletonList(this.t1p)));
        this.coordinator.poll(this.time.timer(Long.MAX_VALUE));
        Assertions.assertEquals(new HashSet(Arrays.asList("test1", "test2")), this.subscriptions.subscription());
        this.metadata.requestUpdate();
        this.consumerClient.poll(this.time.timer(Long.MAX_VALUE));
        Assertions.assertFalse(this.coordinator.rejoinNeededOrPending());
    }

    @Test
    public void testForceMetadataDeleteForPatternSubscriptionDuringRebalance() {
        ConsumerCoordinator buildCoordinator = buildCoordinator(this.rebalanceConfig, new Metrics(), this.assignors, true, this.subscriptions);
        try {
            this.subscriptions.subscribe(Pattern.compile("test.*"), this.rebalanceListener);
            this.client.updateMetadata(RequestTestUtils.metadataUpdateWith(1, new HashMap<String, Integer>() { // from class: org.apache.kafka.clients.consumer.internals.ConsumerCoordinatorTest.10
                {
                    put("test1", 1);
                    put("test2", 1);
                }
            }));
            buildCoordinator.maybeUpdateSubscriptionMetadata();
            Assertions.assertEquals(new HashSet(Arrays.asList("test1", "test2")), this.subscriptions.subscription());
            this.client.prepareResponse(groupCoordinatorResponse(this.node, Errors.NONE));
            buildCoordinator.ensureCoordinatorReady(this.time.timer(Long.MAX_VALUE));
            this.client.prepareMetadataUpdate(RequestTestUtils.metadataUpdateWith(1, new HashMap<String, Integer>() { // from class: org.apache.kafka.clients.consumer.internals.ConsumerCoordinatorTest.11
                {
                    put("test1", 1);
                }
            }));
            this.client.prepareResponse(joinGroupFollowerResponse(1, "consumer", "leader", Errors.NONE));
            this.client.prepareResponse(abstractRequest -> {
                SyncGroupRequest syncGroupRequest = (SyncGroupRequest) abstractRequest;
                return syncGroupRequest.data().memberId().equals("consumer") && syncGroupRequest.data().generationId() == 1 && syncGroupRequest.groupAssignments().isEmpty();
            }, (AbstractResponse) syncGroupResponse(Collections.singletonList(this.t1p), Errors.NONE));
            this.partitionAssignor.prepare(Collections.singletonMap("consumer", Collections.singletonList(this.t1p)));
            buildCoordinator.poll(this.time.timer(Long.MAX_VALUE));
            Assertions.assertEquals(Collections.singleton("test1"), this.subscriptions.subscription());
            this.metadata.requestUpdate();
            this.consumerClient.poll(this.time.timer(Long.MAX_VALUE));
            Assertions.assertFalse(buildCoordinator.rejoinNeededOrPending());
            if (buildCoordinator != null) {
                buildCoordinator.close();
            }
        } catch (Throwable th) {
            if (buildCoordinator != null) {
                try {
                    buildCoordinator.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    public void testOnJoinPrepareWithOffsetCommitShouldSuccessAfterRetry() {
        ConsumerCoordinator prepareCoordinatorForCloseTest = prepareCoordinatorForCloseTest(true, true, Optional.empty(), false);
        try {
            Timer timer = this.time.timer(100L);
            this.client.prepareResponse(offsetCommitResponse(Collections.singletonMap(this.t1p, Errors.UNKNOWN_TOPIC_OR_PARTITION)));
            Assertions.assertFalse(prepareCoordinatorForCloseTest.onJoinPrepare(timer, 42, "consumer-42"));
            Timer timer2 = this.time.timer(100L);
            this.client.prepareResponse(offsetCommitResponse(Collections.singletonMap(this.t1p, Errors.NONE)));
            Assertions.assertTrue(prepareCoordinatorForCloseTest.onJoinPrepare(timer2, 42, "consumer-42"));
            Assertions.assertFalse(this.client.hasPendingResponses());
            Assertions.assertFalse(this.client.hasInFlightRequests());
            Assertions.assertFalse(prepareCoordinatorForCloseTest.coordinatorUnknown());
            if (prepareCoordinatorForCloseTest != null) {
                prepareCoordinatorForCloseTest.close();
            }
        } catch (Throwable th) {
            if (prepareCoordinatorForCloseTest != null) {
                try {
                    prepareCoordinatorForCloseTest.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    public void testOnJoinPrepareWithOffsetCommitShouldKeepJoinAfterNonRetryableException() {
        ConsumerCoordinator prepareCoordinatorForCloseTest = prepareCoordinatorForCloseTest(true, true, Optional.empty(), false);
        try {
            Timer timer = this.time.timer(100L);
            this.client.prepareResponse(offsetCommitResponse(Collections.singletonMap(this.t1p, Errors.UNKNOWN_MEMBER_ID)));
            Assertions.assertTrue(prepareCoordinatorForCloseTest.onJoinPrepare(timer, 42, "consumer-42"));
            Assertions.assertFalse(this.client.hasPendingResponses());
            Assertions.assertFalse(this.client.hasInFlightRequests());
            Assertions.assertFalse(prepareCoordinatorForCloseTest.coordinatorUnknown());
            if (prepareCoordinatorForCloseTest != null) {
                prepareCoordinatorForCloseTest.close();
            }
        } catch (Throwable th) {
            if (prepareCoordinatorForCloseTest != null) {
                try {
                    prepareCoordinatorForCloseTest.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    public void testOnJoinPrepareWithOffsetCommitShouldKeepJoinAfterRebalanceTimeout() {
        ConsumerCoordinator prepareCoordinatorForCloseTest = prepareCoordinatorForCloseTest(true, true, Optional.empty(), false);
        try {
            Assertions.assertFalse(prepareCoordinatorForCloseTest.onJoinPrepare(this.time.timer(0L), 42, "consumer-42"));
            Timer timer = this.time.timer(100L);
            this.time.sleep(60000L);
            this.client.respond(offsetCommitResponse(Collections.singletonMap(this.t1p, Errors.UNKNOWN_TOPIC_OR_PARTITION)));
            Assertions.assertTrue(prepareCoordinatorForCloseTest.onJoinPrepare(timer, 42, "consumer-42"));
            Assertions.assertFalse(this.client.hasPendingResponses());
            Assertions.assertFalse(this.client.hasInFlightRequests());
            Assertions.assertFalse(prepareCoordinatorForCloseTest.coordinatorUnknown());
            if (prepareCoordinatorForCloseTest != null) {
                prepareCoordinatorForCloseTest.close();
            }
        } catch (Throwable th) {
            if (prepareCoordinatorForCloseTest != null) {
                try {
                    prepareCoordinatorForCloseTest.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    public void testJoinPrepareWithDisableAutoCommit() {
        ConsumerCoordinator prepareCoordinatorForCloseTest = prepareCoordinatorForCloseTest(true, false, Optional.of("group-id"), true);
        try {
            prepareCoordinatorForCloseTest.ensureActiveGroup();
            prepareOffsetCommitRequest(Collections.singletonMap(this.t1p, 100L), Errors.NONE);
            Assertions.assertTrue(prepareCoordinatorForCloseTest.onJoinPrepare(this.time.timer(0L), 42, "consumer-42"));
            Assertions.assertTrue(this.client.hasPendingResponses());
            Assertions.assertFalse(this.client.hasInFlightRequests());
            Assertions.assertFalse(prepareCoordinatorForCloseTest.coordinatorUnknown());
            if (prepareCoordinatorForCloseTest != null) {
                prepareCoordinatorForCloseTest.close();
            }
        } catch (Throwable th) {
            if (prepareCoordinatorForCloseTest != null) {
                try {
                    prepareCoordinatorForCloseTest.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    public void testJoinPrepareAndCommitCompleted() {
        ConsumerCoordinator prepareCoordinatorForCloseTest = prepareCoordinatorForCloseTest(true, true, Optional.of("group-id"), true);
        try {
            prepareCoordinatorForCloseTest.ensureActiveGroup();
            prepareOffsetCommitRequest(Collections.singletonMap(this.t1p, 100L), Errors.NONE);
            boolean onJoinPrepare = prepareCoordinatorForCloseTest.onJoinPrepare(this.time.timer(0L), 42, "consumer-42");
            prepareCoordinatorForCloseTest.invokeCompletedOffsetCommitCallbacks();
            Assertions.assertTrue(onJoinPrepare);
            Assertions.assertFalse(this.client.hasPendingResponses());
            Assertions.assertFalse(this.client.hasInFlightRequests());
            Assertions.assertFalse(prepareCoordinatorForCloseTest.coordinatorUnknown());
            if (prepareCoordinatorForCloseTest != null) {
                prepareCoordinatorForCloseTest.close();
            }
        } catch (Throwable th) {
            if (prepareCoordinatorForCloseTest != null) {
                try {
                    prepareCoordinatorForCloseTest.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    public void testJoinPrepareAndCommitWithCoordinatorNotAvailable() {
        ConsumerCoordinator prepareCoordinatorForCloseTest = prepareCoordinatorForCloseTest(true, true, Optional.of("group-id"), true);
        try {
            prepareCoordinatorForCloseTest.ensureActiveGroup();
            prepareOffsetCommitRequest(Collections.singletonMap(this.t1p, 100L), Errors.COORDINATOR_NOT_AVAILABLE);
            boolean onJoinPrepare = prepareCoordinatorForCloseTest.onJoinPrepare(this.time.timer(0L), 42, "consumer-42");
            prepareCoordinatorForCloseTest.invokeCompletedOffsetCommitCallbacks();
            Assertions.assertFalse(onJoinPrepare);
            Assertions.assertFalse(this.client.hasPendingResponses());
            Assertions.assertFalse(this.client.hasInFlightRequests());
            Assertions.assertTrue(prepareCoordinatorForCloseTest.coordinatorUnknown());
            if (prepareCoordinatorForCloseTest != null) {
                prepareCoordinatorForCloseTest.close();
            }
        } catch (Throwable th) {
            if (prepareCoordinatorForCloseTest != null) {
                try {
                    prepareCoordinatorForCloseTest.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    public void testJoinPrepareAndCommitWithUnknownMemberId() {
        ConsumerCoordinator prepareCoordinatorForCloseTest = prepareCoordinatorForCloseTest(true, true, Optional.of("group-id"), true);
        try {
            prepareCoordinatorForCloseTest.ensureActiveGroup();
            prepareOffsetCommitRequest(Collections.singletonMap(this.t1p, 100L), Errors.UNKNOWN_MEMBER_ID);
            boolean onJoinPrepare = prepareCoordinatorForCloseTest.onJoinPrepare(this.time.timer(0L), 42, "consumer-42");
            prepareCoordinatorForCloseTest.invokeCompletedOffsetCommitCallbacks();
            Assertions.assertTrue(onJoinPrepare);
            Assertions.assertFalse(this.client.hasPendingResponses());
            Assertions.assertFalse(this.client.hasInFlightRequests());
            Assertions.assertFalse(prepareCoordinatorForCloseTest.coordinatorUnknown());
            if (prepareCoordinatorForCloseTest != null) {
                prepareCoordinatorForCloseTest.close();
            }
        } catch (Throwable th) {
            if (prepareCoordinatorForCloseTest != null) {
                try {
                    prepareCoordinatorForCloseTest.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    public void testRebalanceWithMetadataChange() {
        List asList = Arrays.asList("test1", "test2");
        List<TopicPartition> asList2 = Arrays.asList(this.t1p, this.t2p);
        this.subscriptions.subscribe(TestUtils.toSet(asList), this.rebalanceListener);
        this.client.updateMetadata(RequestTestUtils.metadataUpdateWith(1, Utils.mkMap(new Map.Entry[]{Utils.mkEntry("test1", 1), Utils.mkEntry("test2", 1)})));
        this.coordinator.maybeUpdateSubscriptionMetadata();
        this.client.prepareResponse(groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady(this.time.timer(Long.MAX_VALUE));
        Map<String, List<String>> singletonMap = Collections.singletonMap("leader", asList);
        this.partitionAssignor.prepare(Collections.singletonMap("leader", asList2));
        this.client.prepareResponse(joinGroupLeaderResponse(1, "leader", singletonMap, Errors.NONE));
        this.client.prepareResponse(syncGroupResponse(asList2, Errors.NONE));
        this.coordinator.poll(this.time.timer(Long.MAX_VALUE));
        Assertions.assertFalse(this.coordinator.rejoinNeededOrPending());
        Assertions.assertEquals(TestUtils.toSet(asList), this.subscriptions.subscription());
        Assertions.assertEquals(TestUtils.toSet(asList2), this.subscriptions.assignedPartitions());
        Assertions.assertEquals(0, this.rebalanceListener.revokedCount);
        Assertions.assertNull(this.rebalanceListener.revoked);
        Assertions.assertEquals(1, this.rebalanceListener.assignedCount);
        this.client.updateMetadata(RequestTestUtils.metadataUpdateWith(1, Collections.singletonMap("test1", 1)));
        this.coordinator.poll(this.time.timer(0L));
        this.client.updateMetadata(RequestTestUtils.metadataUpdateWith(1, Utils.mkMap(new Map.Entry[]{Utils.mkEntry("test1", 1), Utils.mkEntry("test2", 1)})));
        this.client.respond(joinGroupFollowerResponse(1, "leader", "leader", Errors.NOT_COORDINATOR));
        this.client.prepareResponse(groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.poll(this.time.timer(0L));
        Assertions.assertTrue(this.coordinator.rejoinNeededOrPending());
        this.client.respond(abstractRequest -> {
            if (abstractRequest instanceof JoinGroupRequest) {
                return "leader".equals(((JoinGroupRequest) abstractRequest).data().memberId());
            }
            return false;
        }, (AbstractResponse) joinGroupLeaderResponse(2, "leader", singletonMap, Errors.NONE));
        this.client.prepareResponse(syncGroupResponse(asList2, Errors.NONE));
        this.coordinator.poll(this.time.timer(Long.MAX_VALUE));
        Assertions.assertFalse(this.coordinator.rejoinNeededOrPending());
        Collection<TopicPartition> revoked = getRevoked(asList2, asList2);
        Assertions.assertEquals(revoked.isEmpty() ? 0 : 1, this.rebalanceListener.revokedCount);
        Assertions.assertEquals(revoked.isEmpty() ? null : revoked, this.rebalanceListener.revoked);
        Assertions.assertEquals(0, this.rebalanceListener.lostCount);
        Assertions.assertNull(this.rebalanceListener.lost);
        Collection<TopicPartition> added = getAdded(asList2, asList2);
        Assertions.assertEquals(2, this.rebalanceListener.assignedCount);
        Assertions.assertEquals(added.isEmpty() ? Collections.emptySet() : TestUtils.toSet(asList2), this.rebalanceListener.assigned);
        Assertions.assertEquals(TestUtils.toSet(asList2), this.subscriptions.assignedPartitions());
    }

    @Test
    public void testWakeupDuringJoin() {
        List<TopicPartition> emptyList = Collections.emptyList();
        List<TopicPartition> singletonList = Collections.singletonList(this.t1p);
        this.subscriptions.subscribe(Collections.singleton("test1"), this.rebalanceListener);
        this.client.updateMetadata(this.metadataResponse);
        this.client.prepareResponse(groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady(this.time.timer(Long.MAX_VALUE));
        Map<String, List<String>> singletonMap = Collections.singletonMap("leader", Collections.singletonList("test1"));
        this.partitionAssignor.prepare(Collections.singletonMap("leader", singletonList));
        this.client.prepareResponse(joinGroupLeaderResponse(1, "leader", singletonMap, Errors.NONE));
        this.consumerClient.wakeup();
        try {
            this.coordinator.poll(this.time.timer(Long.MAX_VALUE));
        } catch (WakeupException e) {
        }
        this.client.prepareResponse(syncGroupResponse(singletonList, Errors.NONE));
        this.coordinator.poll(this.time.timer(Long.MAX_VALUE));
        Assertions.assertFalse(this.coordinator.rejoinNeededOrPending());
        Assertions.assertEquals(TestUtils.toSet(singletonList), this.subscriptions.assignedPartitions());
        Assertions.assertEquals(0, this.rebalanceListener.revokedCount);
        Assertions.assertNull(this.rebalanceListener.revoked);
        Assertions.assertEquals(1, this.rebalanceListener.assignedCount);
        Assertions.assertEquals(getAdded(emptyList, singletonList), this.rebalanceListener.assigned);
    }

    @Test
    public void testNormalJoinGroupFollower() {
        Set singleton = Collections.singleton("test1");
        List<TopicPartition> emptyList = Collections.emptyList();
        List<TopicPartition> singletonList = Collections.singletonList(this.t1p);
        this.subscriptions.subscribe(singleton, this.rebalanceListener);
        this.client.prepareResponse(groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady(this.time.timer(Long.MAX_VALUE));
        this.client.prepareResponse(joinGroupFollowerResponse(1, "consumer", "leader", Errors.NONE));
        this.client.prepareResponse(abstractRequest -> {
            SyncGroupRequest syncGroupRequest = (SyncGroupRequest) abstractRequest;
            return syncGroupRequest.data().memberId().equals("consumer") && syncGroupRequest.data().generationId() == 1 && syncGroupRequest.groupAssignments().isEmpty();
        }, (AbstractResponse) syncGroupResponse(singletonList, Errors.NONE));
        this.coordinator.joinGroupIfNeeded(this.time.timer(Long.MAX_VALUE));
        Assertions.assertFalse(this.coordinator.rejoinNeededOrPending());
        Assertions.assertEquals(TestUtils.toSet(singletonList), this.subscriptions.assignedPartitions());
        Assertions.assertEquals(singleton, this.subscriptions.metadataTopics());
        Assertions.assertEquals(0, this.rebalanceListener.revokedCount);
        Assertions.assertNull(this.rebalanceListener.revoked);
        Assertions.assertEquals(1, this.rebalanceListener.assignedCount);
        Assertions.assertEquals(getAdded(emptyList, singletonList), this.rebalanceListener.assigned);
    }

    @Test
    public void testUpdateLastHeartbeatPollWhenCoordinatorUnknown() throws Exception {
        this.subscriptions.subscribe(Collections.singleton("test1"), this.rebalanceListener);
        this.client.prepareResponse(groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady(this.time.timer(Long.MAX_VALUE));
        this.client.prepareResponse(joinGroupFollowerResponse(1, "consumer", "leader", Errors.NONE));
        this.client.prepareResponse(syncGroupResponse(Collections.singletonList(this.t1p), Errors.NONE));
        this.client.prepareResponse(heartbeatResponse(Errors.NOT_COORDINATOR));
        this.coordinator.poll(this.time.timer(Long.MAX_VALUE));
        this.time.sleep(5000L);
        TestUtils.waitForCondition(() -> {
            return !this.client.hasPendingResponses();
        }, "Failed to observe expected heartbeat from background thread");
        Assertions.assertTrue(this.coordinator.coordinatorUnknown());
        Assertions.assertFalse(this.coordinator.poll(this.time.timer(0L)));
        Assertions.assertEquals(this.time.milliseconds(), this.coordinator.heartbeat().lastPollTime());
        this.time.sleep(59999L);
        Assertions.assertFalse(this.coordinator.heartbeat().pollTimeoutExpired(this.time.milliseconds()));
    }

    @Test
    public void testPatternJoinGroupFollower() {
        Set mkSet = Utils.mkSet(new String[]{"test1", "test2"});
        List<TopicPartition> emptyList = Collections.emptyList();
        List<TopicPartition> asList = Arrays.asList(this.t1p, this.t2p);
        this.subscriptions.subscribe(Pattern.compile("test.*"), this.rebalanceListener);
        this.client.updateMetadata(RequestTestUtils.metadataUpdateWith(1, Collections.singletonMap("test1", 1)));
        this.client.prepareResponse(groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady(this.time.timer(Long.MAX_VALUE));
        this.client.prepareResponse(joinGroupFollowerResponse(1, "consumer", "leader", Errors.NONE));
        this.client.prepareResponse(abstractRequest -> {
            SyncGroupRequest syncGroupRequest = (SyncGroupRequest) abstractRequest;
            return syncGroupRequest.data().memberId().equals("consumer") && syncGroupRequest.data().generationId() == 1 && syncGroupRequest.groupAssignments().isEmpty();
        }, (AbstractResponse) syncGroupResponse(asList, Errors.NONE));
        this.client.prepareMetadataUpdate(this.metadataResponse);
        this.coordinator.joinGroupIfNeeded(this.time.timer(Long.MAX_VALUE));
        Assertions.assertFalse(this.coordinator.rejoinNeededOrPending());
        Assertions.assertEquals(asList.size(), this.subscriptions.numAssignedPartitions());
        Assertions.assertEquals(mkSet, this.subscriptions.subscription());
        Assertions.assertEquals(0, this.rebalanceListener.revokedCount);
        Assertions.assertNull(this.rebalanceListener.revoked);
        Assertions.assertEquals(1, this.rebalanceListener.assignedCount);
        Assertions.assertEquals(getAdded(emptyList, asList), this.rebalanceListener.assigned);
    }

    @Test
    public void testLeaveGroupOnClose() {
        this.subscriptions.subscribe(Collections.singleton("test1"), this.rebalanceListener);
        joinAsFollowerAndReceiveAssignment(this.coordinator, Collections.singletonList(this.t1p));
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        this.client.prepareResponse(abstractRequest -> {
            atomicBoolean.set(true);
            return validateLeaveGroup("test-group", "consumer", (LeaveGroupRequest) abstractRequest);
        }, (AbstractResponse) new LeaveGroupResponse(new LeaveGroupResponseData().setErrorCode(Errors.NONE.code())));
        this.coordinator.close(this.time.timer(0L));
        Assertions.assertTrue(atomicBoolean.get());
    }

    @Test
    public void testMaybeLeaveGroup() {
        this.subscriptions.subscribe(Collections.singleton("test1"), this.rebalanceListener);
        joinAsFollowerAndReceiveAssignment(this.coordinator, Collections.singletonList(this.t1p));
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        this.client.prepareResponse(abstractRequest -> {
            atomicBoolean.set(true);
            return validateLeaveGroup("test-group", "consumer", (LeaveGroupRequest) abstractRequest);
        }, (AbstractResponse) new LeaveGroupResponse(new LeaveGroupResponseData().setErrorCode(Errors.NONE.code())));
        this.coordinator.maybeLeaveGroup("test maybe leave group");
        Assertions.assertTrue(atomicBoolean.get());
        Assertions.assertNull(this.coordinator.generationIfStable());
    }

    private boolean validateLeaveGroup(String str, String str2, LeaveGroupRequest leaveGroupRequest) {
        List members = leaveGroupRequest.data().members();
        return leaveGroupRequest.data().groupId().equals(str) && members.size() == 1 && ((LeaveGroupRequestData.MemberIdentity) members.get(0)).memberId().equals(str2);
    }

    @Test
    public void testPendingMemberShouldLeaveGroup() {
        this.subscriptions.subscribe(Collections.singleton("test1"), this.rebalanceListener);
        this.client.prepareResponse(groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady(this.time.timer(Long.MAX_VALUE));
        this.client.prepareResponse(joinGroupFollowerResponse(-1, "consumer-id", "leader-id", Errors.MEMBER_ID_REQUIRED));
        this.coordinator.joinGroupIfNeeded(this.time.timer(0L));
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        this.client.prepareResponse(abstractRequest -> {
            atomicBoolean.set(true);
            return validateLeaveGroup("test-group", "consumer-id", (LeaveGroupRequest) abstractRequest);
        }, (AbstractResponse) new LeaveGroupResponse(new LeaveGroupResponseData().setErrorCode(Errors.NONE.code())));
        this.coordinator.maybeLeaveGroup("pending member leaves");
        Assertions.assertTrue(atomicBoolean.get());
    }

    @Test
    public void testUnexpectedErrorOnSyncGroup() {
        this.subscriptions.subscribe(Collections.singleton("test1"), this.rebalanceListener);
        this.client.prepareResponse(groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady(this.time.timer(Long.MAX_VALUE));
        this.client.prepareResponse(joinGroupFollowerResponse(1, "consumer", "leader", Errors.NONE));
        this.client.prepareResponse(syncGroupResponse(Collections.emptyList(), Errors.UNKNOWN_SERVER_ERROR));
        Assertions.assertThrows(KafkaException.class, () -> {
            this.coordinator.joinGroupIfNeeded(this.time.timer(Long.MAX_VALUE));
        });
    }

    @Test
    public void testUnknownMemberIdOnSyncGroup() {
        this.subscriptions.subscribe(Collections.singleton("test1"), this.rebalanceListener);
        this.client.prepareResponse(groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady(this.time.timer(Long.MAX_VALUE));
        this.client.prepareResponse(joinGroupFollowerResponse(1, "consumer", "leader", Errors.NONE));
        this.client.prepareResponse(syncGroupResponse(Collections.emptyList(), Errors.UNKNOWN_MEMBER_ID));
        this.client.prepareResponse(abstractRequest -> {
            return ((JoinGroupRequest) abstractRequest).data().memberId().equals("");
        }, (AbstractResponse) joinGroupFollowerResponse(2, "consumer", "leader", Errors.NONE));
        this.client.prepareResponse(syncGroupResponse(Collections.singletonList(this.t1p), Errors.NONE));
        this.coordinator.joinGroupIfNeeded(this.time.timer(Long.MAX_VALUE));
        Assertions.assertFalse(this.coordinator.rejoinNeededOrPending());
        Assertions.assertEquals(Collections.singleton(this.t1p), this.subscriptions.assignedPartitions());
    }

    @Test
    public void testRebalanceInProgressOnSyncGroup() {
        this.subscriptions.subscribe(Collections.singleton("test1"), this.rebalanceListener);
        this.client.prepareResponse(groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady(this.time.timer(Long.MAX_VALUE));
        this.client.prepareResponse(joinGroupFollowerResponse(1, "consumer", "leader", Errors.NONE));
        this.client.prepareResponse(syncGroupResponse(Collections.emptyList(), Errors.REBALANCE_IN_PROGRESS));
        this.client.prepareResponse(joinGroupFollowerResponse(2, "consumer", "leader", Errors.NONE));
        this.client.prepareResponse(syncGroupResponse(Collections.singletonList(this.t1p), Errors.NONE));
        this.coordinator.joinGroupIfNeeded(this.time.timer(Long.MAX_VALUE));
        Assertions.assertFalse(this.coordinator.rejoinNeededOrPending());
        Assertions.assertEquals(Collections.singleton(this.t1p), this.subscriptions.assignedPartitions());
    }

    @Test
    public void testIllegalGenerationOnSyncGroup() {
        this.subscriptions.subscribe(Collections.singleton("test1"), this.rebalanceListener);
        this.client.prepareResponse(groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady(this.time.timer(Long.MAX_VALUE));
        this.client.prepareResponse(joinGroupFollowerResponse(1, "consumer", "leader", Errors.NONE));
        this.client.prepareResponse(syncGroupResponse(Collections.emptyList(), Errors.ILLEGAL_GENERATION));
        this.client.prepareResponse(abstractRequest -> {
            return ((JoinGroupRequest) abstractRequest).data().memberId().equals("consumer");
        }, (AbstractResponse) joinGroupFollowerResponse(2, "consumer", "leader", Errors.NONE));
        this.client.prepareResponse(syncGroupResponse(Collections.singletonList(this.t1p), Errors.NONE));
        this.coordinator.joinGroupIfNeeded(this.time.timer(Long.MAX_VALUE));
        Assertions.assertFalse(this.coordinator.rejoinNeededOrPending());
        Assertions.assertEquals(Collections.singleton(this.t1p), this.subscriptions.assignedPartitions());
    }

    @Test
    public void testMetadataChangeTriggersRebalance() {
        this.subscriptions.subscribe(Collections.singleton("test1"), this.rebalanceListener);
        this.client.updateMetadata(this.metadataResponse);
        this.client.prepareResponse(groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady(this.time.timer(Long.MAX_VALUE));
        Map<String, List<String>> singletonMap = Collections.singletonMap("consumer", Collections.singletonList("test1"));
        this.partitionAssignor.prepare(Collections.singletonMap("consumer", Collections.singletonList(this.t1p)));
        this.client.prepareResponse(joinGroupLeaderResponse(1, "consumer", singletonMap, Errors.NONE));
        this.client.prepareResponse(syncGroupResponse(Collections.singletonList(this.t1p), Errors.NONE));
        this.coordinator.poll(this.time.timer(Long.MAX_VALUE));
        Assertions.assertFalse(this.coordinator.rejoinNeededOrPending());
        this.metadata.updateWithCurrentRequestVersion(RequestTestUtils.metadataUpdateWith(1, Collections.singletonMap("test1", 2)), false, this.time.milliseconds());
        this.coordinator.maybeUpdateSubscriptionMetadata();
        Assertions.assertTrue(this.coordinator.rejoinNeededOrPending());
    }

    @Test
    public void testStaticLeaderRejoinsGroupAndCanTriggersRebalance() {
        this.subscriptions.subscribe(Collections.singleton("test1"), this.rebalanceListener);
        this.client.updateMetadata(this.metadataResponse);
        this.client.prepareResponse(groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady(this.time.timer(Long.MAX_VALUE));
        this.client.prepareResponse(joinGroupLeaderResponse(1, "consumer", Collections.singletonMap("consumer", Collections.singletonList("test1")), true, Errors.NONE, Optional.empty()));
        this.client.prepareResponse(syncGroupResponse(Collections.singletonList(this.t1p), Errors.NONE));
        this.coordinator.poll(this.time.timer(Long.MAX_VALUE));
        Assertions.assertFalse(this.coordinator.rejoinNeededOrPending());
        Assertions.assertEquals(Collections.singleton("test1"), this.coordinator.subscriptionState().metadataTopics());
        this.metadata.updateWithCurrentRequestVersion(RequestTestUtils.metadataUpdateWith(1, Collections.singletonMap("test1", 2)), false, this.time.milliseconds());
        this.coordinator.maybeUpdateSubscriptionMetadata();
        Assertions.assertTrue(this.coordinator.rejoinNeededOrPending());
    }

    @Test
    public void testStaticLeaderRejoinsGroupAndCanDetectMetadataChangesForOtherMembers() {
        this.subscriptions.subscribe(Collections.singleton("test1"), this.rebalanceListener);
        this.client.updateMetadata(this.metadataResponse);
        this.client.prepareResponse(groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady(this.time.timer(Long.MAX_VALUE));
        this.client.prepareResponse(joinGroupLeaderResponse(1, "consumer", Utils.mkMap(new Map.Entry[]{Utils.mkEntry("consumer", Collections.singletonList("test1")), Utils.mkEntry("consumer2", Collections.singletonList("test2"))}), true, Errors.NONE, Optional.empty()));
        this.client.prepareResponse(syncGroupResponse(Collections.singletonList(this.t1p), Errors.NONE));
        this.coordinator.poll(this.time.timer(Long.MAX_VALUE));
        Assertions.assertFalse(this.coordinator.rejoinNeededOrPending());
        Assertions.assertEquals(Utils.mkSet(new String[]{"test1", "test2"}), this.coordinator.subscriptionState().metadataTopics());
        this.metadata.updateWithCurrentRequestVersion(RequestTestUtils.metadataUpdateWith(1, Collections.singletonMap("test2", 2)), false, this.time.milliseconds());
        this.coordinator.maybeUpdateSubscriptionMetadata();
        Assertions.assertTrue(this.coordinator.rejoinNeededOrPending());
    }

    @Test
    public void testUpdateMetadataDuringRebalance() {
        TopicPartition topicPartition = new TopicPartition("topic1", 0);
        TopicPartition topicPartition2 = new TopicPartition("topic2", 0);
        List asList = Arrays.asList("topic1", "topic2");
        this.subscriptions.subscribe(new HashSet(asList), this.rebalanceListener);
        this.client.updateMetadata(RequestTestUtils.metadataUpdateWith(1, Collections.singletonMap("topic1", 1)));
        this.client.prepareResponse(groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady(this.time.timer(Long.MAX_VALUE));
        Map<String, List<String>> singletonMap = Collections.singletonMap("leader", asList);
        this.partitionAssignor.prepare(Collections.singletonMap("leader", Arrays.asList(topicPartition)));
        this.client.prepareResponse(joinGroupLeaderResponse(1, "leader", singletonMap, Errors.NONE));
        this.client.prepareResponse(abstractRequest -> {
            SyncGroupRequest syncGroupRequest = (SyncGroupRequest) abstractRequest;
            if (!syncGroupRequest.data().memberId().equals("leader") || syncGroupRequest.data().generationId() != 1 || !syncGroupRequest.groupAssignments().containsKey("leader")) {
                return false;
            }
            HashMap hashMap = new HashMap();
            hashMap.put("topic1", 1);
            hashMap.put("topic2", 1);
            this.client.updateMetadata(RequestTestUtils.metadataUpdateWith(1, hashMap));
            return true;
        }, (AbstractResponse) syncGroupResponse(Collections.singletonList(topicPartition), Errors.NONE));
        this.coordinator.poll(this.time.timer(Long.MAX_VALUE));
        this.client.prepareResponse(joinGroupLeaderResponse(2, "leader", singletonMap, Errors.NONE));
        this.client.prepareResponse(syncGroupResponse(Arrays.asList(topicPartition, topicPartition2), Errors.NONE));
        this.coordinator.poll(this.time.timer(Long.MAX_VALUE));
        Assertions.assertFalse(this.coordinator.rejoinNeededOrPending());
        Assertions.assertEquals(new HashSet(Arrays.asList(topicPartition, topicPartition2)), this.subscriptions.assignedPartitions());
    }

    @Test
    public void testSubscriptionChangeWithAuthorizationFailure() {
        this.subscriptions.subscribe(Utils.mkSet(new String[]{"test1", "test2"}), this.rebalanceListener);
        this.client.prepareMetadataUpdate(RequestTestUtils.metadataUpdateWith("kafka-cluster", 1, Collections.singletonMap("test2", Errors.TOPIC_AUTHORIZATION_FAILED), Collections.singletonMap("test1", 1)));
        Assertions.assertThrows(TopicAuthorizationException.class, () -> {
            this.coordinator.poll(this.time.timer(Long.MAX_VALUE));
        });
        this.client.respond(groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady(this.time.timer(Long.MAX_VALUE));
        this.client.prepareResponse(joinGroupLeaderResponse(0, "consumer", Collections.emptyMap(), Errors.GROUP_AUTHORIZATION_FAILED));
        Assertions.assertThrows(GroupAuthorizationException.class, () -> {
            this.coordinator.poll(this.time.timer(Long.MAX_VALUE));
        });
        this.subscriptions.subscribe(Utils.mkSet(new String[]{"test1"}), this.rebalanceListener);
        Assertions.assertEquals(Collections.singleton("test1"), this.subscriptions.metadataTopics());
        this.client.prepareMetadataUpdate(RequestTestUtils.metadataUpdateWith("kafka-cluster", 1, Collections.emptyMap(), Collections.singletonMap("test1", 1)));
        Map<String, List<String>> singletonMap = Collections.singletonMap("consumer", Collections.singletonList("test1"));
        this.partitionAssignor.prepare(Collections.singletonMap("consumer", Collections.singletonList(this.t1p)));
        this.client.prepareResponse(joinGroupLeaderResponse(1, "consumer", singletonMap, Errors.NONE));
        this.client.prepareResponse(syncGroupResponse(Collections.singletonList(this.t1p), Errors.NONE));
        this.coordinator.poll(this.time.timer(Long.MAX_VALUE));
        Assertions.assertEquals(Collections.singleton("test1"), this.subscriptions.subscription());
        Assertions.assertEquals(Collections.singleton("test1"), this.subscriptions.metadataTopics());
    }

    @Test
    public void testWakeupFromAssignmentCallback() {
        TopicPartition topicPartition = new TopicPartition("topic1", 0);
        Set singleton = Collections.singleton("topic1");
        MockRebalanceListener mockRebalanceListener = new MockRebalanceListener() { // from class: org.apache.kafka.clients.consumer.internals.ConsumerCoordinatorTest.12
            @Override // org.apache.kafka.clients.consumer.internals.MockRebalanceListener
            public void onPartitionsAssigned(Collection<TopicPartition> collection) {
                boolean z = this.assignedCount == 0;
                super.onPartitionsAssigned(collection);
                if (z) {
                    throw new WakeupException();
                }
            }
        };
        this.subscriptions.subscribe(singleton, mockRebalanceListener);
        this.client.updateMetadata(RequestTestUtils.metadataUpdateWith(1, Collections.singletonMap("test1", 1)));
        this.client.prepareResponse(groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady(this.time.timer(Long.MAX_VALUE));
        this.partitionAssignor.prepare(Collections.singletonMap("follower", Collections.singletonList(topicPartition)));
        this.client.prepareResponse(joinGroupFollowerResponse(1, "follower", "leader", Errors.NONE));
        this.client.prepareResponse(syncGroupResponse(Collections.singletonList(topicPartition), Errors.NONE));
        try {
            this.coordinator.poll(this.time.timer(Long.MAX_VALUE));
            Assertions.fail("Expected exception thrown from assignment callback");
        } catch (WakeupException e) {
        }
        this.coordinator.poll(this.time.timer(Long.MAX_VALUE));
        Assertions.assertFalse(this.coordinator.rejoinNeededOrPending());
        Assertions.assertEquals(0, mockRebalanceListener.revokedCount);
        Assertions.assertEquals(2, mockRebalanceListener.assignedCount);
    }

    @Test
    public void testRebalanceAfterTopicUnavailableWithSubscribe() {
        unavailableTopicTest(false, Collections.emptySet());
    }

    @Test
    public void testRebalanceAfterTopicUnavailableWithPatternSubscribe() {
        unavailableTopicTest(true, Collections.emptySet());
    }

    @Test
    public void testRebalanceAfterNotMatchingTopicUnavailableWithPatternSubscribe() {
        unavailableTopicTest(true, Collections.singleton("notmatching"));
    }

    private void unavailableTopicTest(boolean z, Set<String> set) {
        if (z) {
            this.subscriptions.subscribe(Pattern.compile("test.*"), this.rebalanceListener);
        } else {
            this.subscriptions.subscribe(Collections.singleton("test1"), this.rebalanceListener);
        }
        this.client.prepareMetadataUpdate(RequestTestUtils.metadataUpdateWith("kafka-cluster", 1, Collections.singletonMap("test1", Errors.UNKNOWN_TOPIC_OR_PARTITION), Collections.emptyMap()));
        this.client.prepareResponse(groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady(this.time.timer(Long.MAX_VALUE));
        Map<String, List<String>> singletonMap = Collections.singletonMap("consumer", Collections.singletonList("test1"));
        this.partitionAssignor.prepare(Collections.emptyMap());
        this.client.prepareResponse(joinGroupLeaderResponse(1, "consumer", singletonMap, Errors.NONE));
        this.client.prepareResponse(syncGroupResponse(Collections.emptyList(), Errors.NONE));
        this.coordinator.poll(this.time.timer(Long.MAX_VALUE));
        Assertions.assertFalse(this.coordinator.rejoinNeededOrPending());
        Assertions.assertEquals(Collections.emptySet(), this.rebalanceListener.assigned);
        Assertions.assertTrue(this.metadata.updateRequested(), "Metadata refresh not requested for unavailable partitions");
        HashMap hashMap = new HashMap();
        Iterator<String> it = set.iterator();
        while (it.hasNext()) {
            hashMap.put(it.next(), Errors.UNKNOWN_TOPIC_OR_PARTITION);
        }
        this.client.prepareMetadataUpdate(RequestTestUtils.metadataUpdateWith("kafka-cluster", 1, hashMap, Collections.singletonMap("test1", 1)));
        this.consumerClient.poll(this.time.timer(0L));
        this.client.prepareResponse(joinGroupLeaderResponse(2, "consumer", singletonMap, Errors.NONE));
        this.client.prepareResponse(syncGroupResponse(Collections.singletonList(this.t1p), Errors.NONE));
        this.coordinator.poll(this.time.timer(Long.MAX_VALUE));
        Assertions.assertFalse(this.metadata.updateRequested(), "Metadata refresh requested unnecessarily");
        Assertions.assertFalse(this.coordinator.rejoinNeededOrPending());
        Assertions.assertEquals(Collections.singleton(this.t1p), this.rebalanceListener.assigned);
    }

    @Test
    public void testExcludeInternalTopicsConfigOption() {
        testInternalTopicInclusion(false);
    }

    @Test
    public void testIncludeInternalTopicsConfigOption() {
        testInternalTopicInclusion(true);
    }

    private void testInternalTopicInclusion(boolean z) {
        this.metadata = new ConsumerMetadata(0L, Long.MAX_VALUE, z, false, this.subscriptions, new LogContext(), new ClusterResourceListeners());
        this.client = new MockClient((Time) this.time, (Metadata) this.metadata);
        ConsumerCoordinator buildCoordinator = buildCoordinator(this.rebalanceConfig, new Metrics(), this.assignors, false, this.subscriptions);
        try {
            this.subscriptions.subscribe(Pattern.compile(".*"), this.rebalanceListener);
            Node node = new Node(0, "localhost", 9999);
            this.client.updateMetadata(RequestTestUtils.metadataResponse(Collections.singletonList(node), "clusterId", node.id(), Collections.singletonList(new MetadataResponse.TopicMetadata(Errors.NONE, "__consumer_offsets", true, Collections.singletonList(new MetadataResponse.PartitionMetadata(Errors.NONE, new TopicPartition("__consumer_offsets", 0), Optional.of(Integer.valueOf(node.id())), Optional.empty(), Collections.singletonList(Integer.valueOf(node.id())), Collections.singletonList(Integer.valueOf(node.id())), Collections.singletonList(Integer.valueOf(node.id()))))))));
            buildCoordinator.maybeUpdateSubscriptionMetadata();
            Assertions.assertEquals(Boolean.valueOf(z), Boolean.valueOf(this.subscriptions.subscription().contains("__consumer_offsets")));
            if (buildCoordinator != null) {
                buildCoordinator.close();
            }
        } catch (Throwable th) {
            if (buildCoordinator != null) {
                try {
                    buildCoordinator.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    public void testRejoinGroup() {
        List<TopicPartition> emptyList = Collections.emptyList();
        List<TopicPartition> asList = Arrays.asList(this.t1p);
        this.subscriptions.subscribe(Collections.singleton("test1"), this.rebalanceListener);
        joinAsFollowerAndReceiveAssignment(this.coordinator, asList);
        Assertions.assertEquals(0, this.rebalanceListener.revokedCount);
        Assertions.assertNull(this.rebalanceListener.revoked);
        Assertions.assertEquals(1, this.rebalanceListener.assignedCount);
        Assertions.assertEquals(getAdded(emptyList, asList), this.rebalanceListener.assigned);
        this.rebalanceListener.revoked = null;
        this.rebalanceListener.assigned = null;
        this.subscriptions.subscribe(new HashSet(Arrays.asList("test1", "otherTopic")), this.rebalanceListener);
        this.client.prepareResponse(joinGroupFollowerResponse(2, "consumer", "leader", Errors.NONE));
        this.client.prepareResponse(syncGroupResponse(asList, Errors.NONE));
        this.coordinator.joinGroupIfNeeded(this.time.timer(Long.MAX_VALUE));
        Collection<TopicPartition> revoked = getRevoked(asList, asList);
        Collection<TopicPartition> added = getAdded(asList, asList);
        Assertions.assertEquals(revoked.isEmpty() ? 0 : 1, this.rebalanceListener.revokedCount);
        Assertions.assertEquals(revoked.isEmpty() ? null : revoked, this.rebalanceListener.revoked);
        Assertions.assertEquals(2, this.rebalanceListener.assignedCount);
        Assertions.assertEquals(added, this.rebalanceListener.assigned);
    }

    @Test
    public void testDisconnectInJoin() {
        this.subscriptions.subscribe(Collections.singleton("test1"), this.rebalanceListener);
        List<TopicPartition> emptyList = Collections.emptyList();
        List<TopicPartition> asList = Arrays.asList(this.t1p);
        this.client.prepareResponse(groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady(this.time.timer(Long.MAX_VALUE));
        this.client.prepareResponse((AbstractResponse) joinGroupFollowerResponse(1, "consumer", "leader", Errors.NONE), true);
        this.client.prepareResponse(groupCoordinatorResponse(this.node, Errors.NONE));
        this.client.prepareResponse(joinGroupFollowerResponse(1, "consumer", "leader", Errors.NONE));
        this.client.prepareResponse(syncGroupResponse(asList, Errors.NONE));
        this.coordinator.joinGroupIfNeeded(this.time.timer(Long.MAX_VALUE));
        Assertions.assertFalse(this.coordinator.rejoinNeededOrPending());
        Assertions.assertEquals(TestUtils.toSet(asList), this.subscriptions.assignedPartitions());
        Assertions.assertEquals(0, this.rebalanceListener.revokedCount);
        Assertions.assertNull(this.rebalanceListener.revoked);
        Assertions.assertEquals(1, this.rebalanceListener.assignedCount);
        Assertions.assertEquals(getAdded(emptyList, asList), this.rebalanceListener.assigned);
    }

    @Test
    public void testInvalidSessionTimeout() {
        this.subscriptions.subscribe(Collections.singleton("test1"), this.rebalanceListener);
        this.client.prepareResponse(groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady(this.time.timer(Long.MAX_VALUE));
        this.client.prepareResponse(joinGroupFollowerResponse(0, "consumer", "", Errors.INVALID_SESSION_TIMEOUT));
        Assertions.assertThrows(ApiException.class, () -> {
            this.coordinator.joinGroupIfNeeded(this.time.timer(Long.MAX_VALUE));
        });
    }

    @Test
    public void testCommitOffsetOnly() {
        this.subscriptions.assignFromUser(Collections.singleton(this.t1p));
        this.client.prepareResponse(groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady(this.time.timer(Long.MAX_VALUE));
        prepareOffsetCommitRequest(Collections.singletonMap(this.t1p, 100L), Errors.NONE);
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        this.coordinator.commitOffsetsAsync(Collections.singletonMap(this.t1p, new OffsetAndMetadata(100L)), callback(atomicBoolean));
        this.coordinator.invokeCompletedOffsetCommitCallbacks();
        Assertions.assertTrue(atomicBoolean.get());
    }

    @Test
    public void testCoordinatorDisconnectAfterNotCoordinatorError() {
        testInFlightRequestsFailedAfterCoordinatorMarkedDead(Errors.NOT_COORDINATOR);
    }

    @Test
    public void testCoordinatorDisconnectAfterCoordinatorNotAvailableError() {
        testInFlightRequestsFailedAfterCoordinatorMarkedDead(Errors.COORDINATOR_NOT_AVAILABLE);
    }

    private void testInFlightRequestsFailedAfterCoordinatorMarkedDead(Errors errors) {
        this.client.prepareResponse(groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady(this.time.timer(Long.MAX_VALUE));
        MockCommitCallback mockCommitCallback = new MockCommitCallback();
        MockCommitCallback mockCommitCallback2 = new MockCommitCallback();
        this.coordinator.commitOffsetsAsync(Collections.singletonMap(this.t1p, new OffsetAndMetadata(100L)), mockCommitCallback);
        this.coordinator.commitOffsetsAsync(Collections.singletonMap(this.t1p, new OffsetAndMetadata(100L)), mockCommitCallback2);
        respondToOffsetCommitRequest(Collections.singletonMap(this.t1p, 100L), errors);
        this.consumerClient.pollNoWakeup();
        this.consumerClient.pollNoWakeup();
        this.coordinator.invokeCompletedOffsetCommitCallbacks();
        Assertions.assertTrue(this.coordinator.coordinatorUnknown());
        Assertions.assertTrue(mockCommitCallback.exception instanceof RetriableCommitFailedException);
        Assertions.assertTrue(mockCommitCallback2.exception instanceof RetriableCommitFailedException);
    }

    @Test
    public void testAutoCommitDynamicAssignment() {
        ConsumerCoordinator buildCoordinator = buildCoordinator(this.rebalanceConfig, new Metrics(), this.assignors, true, this.subscriptions);
        try {
            this.subscriptions.subscribe(Collections.singleton("test1"), this.rebalanceListener);
            joinAsFollowerAndReceiveAssignment(buildCoordinator, Collections.singletonList(this.t1p));
            this.subscriptions.seek(this.t1p, 100L);
            prepareOffsetCommitRequest(Collections.singletonMap(this.t1p, 100L), Errors.NONE);
            this.time.sleep(2000L);
            buildCoordinator.poll(this.time.timer(Long.MAX_VALUE));
            Assertions.assertFalse(this.client.hasPendingResponses());
            if (buildCoordinator != null) {
                buildCoordinator.close();
            }
        } catch (Throwable th) {
            if (buildCoordinator != null) {
                try {
                    buildCoordinator.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    public void testAutoCommitRetryBackoff() {
        ConsumerCoordinator buildCoordinator = buildCoordinator(this.rebalanceConfig, new Metrics(), this.assignors, true, this.subscriptions);
        try {
            this.subscriptions.subscribe(Collections.singleton("test1"), this.rebalanceListener);
            joinAsFollowerAndReceiveAssignment(buildCoordinator, Collections.singletonList(this.t1p));
            this.subscriptions.seek(this.t1p, 100L);
            this.time.sleep(2000L);
            prepareOffsetCommitRequest(Collections.singletonMap(this.t1p, 100L), Errors.NOT_COORDINATOR);
            buildCoordinator.poll(this.time.timer(Long.MAX_VALUE));
            Assertions.assertTrue(buildCoordinator.coordinatorUnknown());
            this.client.prepareResponse(groupCoordinatorResponse(this.node, Errors.NONE));
            buildCoordinator.poll(this.time.timer(Long.MAX_VALUE));
            this.subscriptions.seek(this.t1p, 200L);
            this.time.sleep(50L);
            buildCoordinator.poll(this.time.timer(Long.MAX_VALUE));
            Assertions.assertEquals(0, this.client.inFlightRequestCount());
            this.time.sleep(50L);
            buildCoordinator.poll(this.time.timer(Long.MAX_VALUE));
            Assertions.assertEquals(1, this.client.inFlightRequestCount());
            respondToOffsetCommitRequest(Collections.singletonMap(this.t1p, 200L), Errors.NONE);
            if (buildCoordinator != null) {
                buildCoordinator.close();
            }
        } catch (Throwable th) {
            if (buildCoordinator != null) {
                try {
                    buildCoordinator.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    public void testAutoCommitAwaitsInterval() {
        ConsumerCoordinator buildCoordinator = buildCoordinator(this.rebalanceConfig, new Metrics(), this.assignors, true, this.subscriptions);
        try {
            this.subscriptions.subscribe(Collections.singleton("test1"), this.rebalanceListener);
            joinAsFollowerAndReceiveAssignment(buildCoordinator, Collections.singletonList(this.t1p));
            this.subscriptions.seek(this.t1p, 100L);
            this.time.sleep(2000L);
            buildCoordinator.poll(this.time.timer(Long.MAX_VALUE));
            Assertions.assertEquals(1, this.client.inFlightRequestCount());
            this.time.sleep(1000L);
            buildCoordinator.poll(this.time.timer(Long.MAX_VALUE));
            Assertions.assertEquals(1, this.client.inFlightRequestCount());
            respondToOffsetCommitRequest(Collections.singletonMap(this.t1p, 100L), Errors.NONE);
            buildCoordinator.poll(this.time.timer(Long.MAX_VALUE));
            Assertions.assertEquals(0, this.client.inFlightRequestCount());
            this.subscriptions.seek(this.t1p, 200L);
            buildCoordinator.poll(this.time.timer(Long.MAX_VALUE));
            Assertions.assertEquals(0, this.client.inFlightRequestCount());
            this.time.sleep(1000L);
            buildCoordinator.poll(this.time.timer(Long.MAX_VALUE));
            Assertions.assertEquals(1, this.client.inFlightRequestCount());
            respondToOffsetCommitRequest(Collections.singletonMap(this.t1p, 200L), Errors.NONE);
            if (buildCoordinator != null) {
                buildCoordinator.close();
            }
        } catch (Throwable th) {
            if (buildCoordinator != null) {
                try {
                    buildCoordinator.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    public void testAutoCommitDynamicAssignmentRebalance() {
        ConsumerCoordinator buildCoordinator = buildCoordinator(this.rebalanceConfig, new Metrics(), this.assignors, true, this.subscriptions);
        try {
            this.subscriptions.subscribe(Collections.singleton("test1"), this.rebalanceListener);
            this.client.prepareResponse(groupCoordinatorResponse(this.node, Errors.NONE));
            buildCoordinator.ensureCoordinatorReady(this.time.timer(Long.MAX_VALUE));
            this.time.sleep(2000L);
            this.consumerClient.poll(this.time.timer(0L));
            this.client.prepareResponse(joinGroupFollowerResponse(1, "consumer", "leader", Errors.NONE));
            this.client.prepareResponse(syncGroupResponse(Collections.singletonList(this.t1p), Errors.NONE));
            buildCoordinator.joinGroupIfNeeded(this.time.timer(Long.MAX_VALUE));
            this.subscriptions.seek(this.t1p, 100L);
            prepareOffsetCommitRequest(Collections.singletonMap(this.t1p, 100L), Errors.NONE);
            this.time.sleep(2000L);
            buildCoordinator.poll(this.time.timer(Long.MAX_VALUE));
            Assertions.assertFalse(this.client.hasPendingResponses());
            if (buildCoordinator != null) {
                buildCoordinator.close();
            }
        } catch (Throwable th) {
            if (buildCoordinator != null) {
                try {
                    buildCoordinator.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    public void testAutoCommitManualAssignment() {
        ConsumerCoordinator buildCoordinator = buildCoordinator(this.rebalanceConfig, new Metrics(), this.assignors, true, this.subscriptions);
        try {
            this.subscriptions.assignFromUser(Collections.singleton(this.t1p));
            this.subscriptions.seek(this.t1p, 100L);
            this.client.prepareResponse(groupCoordinatorResponse(this.node, Errors.NONE));
            buildCoordinator.ensureCoordinatorReady(this.time.timer(Long.MAX_VALUE));
            prepareOffsetCommitRequest(Collections.singletonMap(this.t1p, 100L), Errors.NONE);
            this.time.sleep(2000L);
            buildCoordinator.poll(this.time.timer(Long.MAX_VALUE));
            Assertions.assertFalse(this.client.hasPendingResponses());
            if (buildCoordinator != null) {
                buildCoordinator.close();
            }
        } catch (Throwable th) {
            if (buildCoordinator != null) {
                try {
                    buildCoordinator.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    public void testAutoCommitManualAssignmentCoordinatorUnknown() {
        ConsumerCoordinator buildCoordinator = buildCoordinator(this.rebalanceConfig, new Metrics(), this.assignors, true, this.subscriptions);
        try {
            this.subscriptions.assignFromUser(Collections.singleton(this.t1p));
            this.subscriptions.seek(this.t1p, 100L);
            this.consumerClient.poll(this.time.timer(0L));
            this.time.sleep(2000L);
            this.consumerClient.poll(this.time.timer(0L));
            this.client.prepareResponse(groupCoordinatorResponse(this.node, Errors.NONE));
            buildCoordinator.ensureCoordinatorReady(this.time.timer(Long.MAX_VALUE));
            this.time.sleep(100L);
            prepareOffsetCommitRequest(Collections.singletonMap(this.t1p, 100L), Errors.NONE);
            buildCoordinator.poll(this.time.timer(Long.MAX_VALUE));
            Assertions.assertFalse(this.client.hasPendingResponses());
            if (buildCoordinator != null) {
                buildCoordinator.close();
            }
        } catch (Throwable th) {
            if (buildCoordinator != null) {
                try {
                    buildCoordinator.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    public void testCommitOffsetMetadata() {
        this.subscriptions.assignFromUser(Collections.singleton(this.t1p));
        this.client.prepareResponse(groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady(this.time.timer(Long.MAX_VALUE));
        prepareOffsetCommitRequest(Collections.singletonMap(this.t1p, 100L), Errors.NONE);
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        Map<TopicPartition, OffsetAndMetadata> singletonMap = Collections.singletonMap(this.t1p, new OffsetAndMetadata(100L, "hello"));
        this.coordinator.commitOffsetsAsync(singletonMap, callback(singletonMap, atomicBoolean));
        this.coordinator.invokeCompletedOffsetCommitCallbacks();
        Assertions.assertTrue(atomicBoolean.get());
    }

    @Test
    public void testCommitOffsetAsyncWithDefaultCallback() {
        int i = this.mockOffsetCommitCallback.invoked;
        this.client.prepareResponse(groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady(this.time.timer(Long.MAX_VALUE));
        prepareOffsetCommitRequest(Collections.singletonMap(this.t1p, 100L), Errors.NONE);
        this.coordinator.commitOffsetsAsync(Collections.singletonMap(this.t1p, new OffsetAndMetadata(100L)), this.mockOffsetCommitCallback);
        this.coordinator.invokeCompletedOffsetCommitCallbacks();
        Assertions.assertEquals(i + 1, this.mockOffsetCommitCallback.invoked);
        Assertions.assertNull(this.mockOffsetCommitCallback.exception);
    }

    @Test
    public void testCommitAfterLeaveGroup() {
        this.subscriptions.subscribe(Collections.singleton("test1"), this.rebalanceListener);
        joinAsFollowerAndReceiveAssignment(this.coordinator, Collections.singletonList(this.t1p));
        this.client.prepareResponse(new LeaveGroupResponse(new LeaveGroupResponseData().setErrorCode(Errors.NONE.code())));
        this.subscriptions.unsubscribe();
        this.coordinator.maybeLeaveGroup("test commit after leave");
        this.subscriptions.assignFromUser(Collections.singleton(this.t1p));
        this.client.prepareResponse(abstractRequest -> {
            OffsetCommitRequest offsetCommitRequest = (OffsetCommitRequest) abstractRequest;
            return offsetCommitRequest.data().memberId().equals("") && offsetCommitRequest.data().generationId() == -1;
        }, (AbstractResponse) offsetCommitResponse(Collections.singletonMap(this.t1p, Errors.NONE)));
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        this.coordinator.commitOffsetsAsync(Collections.singletonMap(this.t1p, new OffsetAndMetadata(100L)), callback(atomicBoolean));
        this.coordinator.invokeCompletedOffsetCommitCallbacks();
        Assertions.assertTrue(atomicBoolean.get());
    }

    @Test
    public void testCommitOffsetAsyncFailedWithDefaultCallback() {
        int i = this.mockOffsetCommitCallback.invoked;
        this.client.prepareResponse(groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady(this.time.timer(Long.MAX_VALUE));
        prepareOffsetCommitRequest(Collections.singletonMap(this.t1p, 100L), Errors.COORDINATOR_NOT_AVAILABLE);
        this.coordinator.commitOffsetsAsync(Collections.singletonMap(this.t1p, new OffsetAndMetadata(100L)), this.mockOffsetCommitCallback);
        this.coordinator.invokeCompletedOffsetCommitCallbacks();
        Assertions.assertEquals(i + 1, this.mockOffsetCommitCallback.invoked);
        Assertions.assertTrue(this.mockOffsetCommitCallback.exception instanceof RetriableCommitFailedException);
    }

    @Test
    public void testCommitOffsetAsyncCoordinatorNotAvailable() {
        this.client.prepareResponse(groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady(this.time.timer(Long.MAX_VALUE));
        MockCommitCallback mockCommitCallback = new MockCommitCallback();
        prepareOffsetCommitRequest(Collections.singletonMap(this.t1p, 100L), Errors.COORDINATOR_NOT_AVAILABLE);
        this.coordinator.commitOffsetsAsync(Collections.singletonMap(this.t1p, new OffsetAndMetadata(100L)), mockCommitCallback);
        this.coordinator.invokeCompletedOffsetCommitCallbacks();
        Assertions.assertTrue(this.coordinator.coordinatorUnknown());
        Assertions.assertEquals(1, mockCommitCallback.invoked);
        Assertions.assertTrue(mockCommitCallback.exception instanceof RetriableCommitFailedException);
    }

    @Test
    public void testCommitOffsetAsyncNotCoordinator() {
        this.client.prepareResponse(groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady(this.time.timer(Long.MAX_VALUE));
        MockCommitCallback mockCommitCallback = new MockCommitCallback();
        prepareOffsetCommitRequest(Collections.singletonMap(this.t1p, 100L), Errors.NOT_COORDINATOR);
        this.coordinator.commitOffsetsAsync(Collections.singletonMap(this.t1p, new OffsetAndMetadata(100L)), mockCommitCallback);
        this.coordinator.invokeCompletedOffsetCommitCallbacks();
        Assertions.assertTrue(this.coordinator.coordinatorUnknown());
        Assertions.assertEquals(1, mockCommitCallback.invoked);
        Assertions.assertTrue(mockCommitCallback.exception instanceof RetriableCommitFailedException);
    }

    @Test
    public void testCommitOffsetAsyncDisconnected() {
        this.client.prepareResponse(groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady(this.time.timer(Long.MAX_VALUE));
        MockCommitCallback mockCommitCallback = new MockCommitCallback();
        prepareOffsetCommitRequestDisconnect(Collections.singletonMap(this.t1p, 100L));
        this.coordinator.commitOffsetsAsync(Collections.singletonMap(this.t1p, new OffsetAndMetadata(100L)), mockCommitCallback);
        this.coordinator.invokeCompletedOffsetCommitCallbacks();
        Assertions.assertTrue(this.coordinator.coordinatorUnknown());
        Assertions.assertEquals(1, mockCommitCallback.invoked);
        Assertions.assertTrue(mockCommitCallback.exception instanceof RetriableCommitFailedException);
    }

    @Test
    public void testCommitOffsetSyncNotCoordinator() {
        this.client.prepareResponse(groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady(this.time.timer(Long.MAX_VALUE));
        prepareOffsetCommitRequest(Collections.singletonMap(this.t1p, 100L), Errors.NOT_COORDINATOR);
        this.client.prepareResponse(groupCoordinatorResponse(this.node, Errors.NONE));
        prepareOffsetCommitRequest(Collections.singletonMap(this.t1p, 100L), Errors.NONE);
        this.coordinator.commitOffsetsSync(Collections.singletonMap(this.t1p, new OffsetAndMetadata(100L)), this.time.timer(Long.MAX_VALUE));
    }

    @Test
    public void testCommitOffsetSyncCoordinatorNotAvailable() {
        this.client.prepareResponse(groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady(this.time.timer(Long.MAX_VALUE));
        prepareOffsetCommitRequest(Collections.singletonMap(this.t1p, 100L), Errors.COORDINATOR_NOT_AVAILABLE);
        this.client.prepareResponse(groupCoordinatorResponse(this.node, Errors.NONE));
        prepareOffsetCommitRequest(Collections.singletonMap(this.t1p, 100L), Errors.NONE);
        this.coordinator.commitOffsetsSync(Collections.singletonMap(this.t1p, new OffsetAndMetadata(100L)), this.time.timer(Long.MAX_VALUE));
    }

    @Test
    public void testCommitOffsetSyncCoordinatorDisconnected() {
        this.client.prepareResponse(groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady(this.time.timer(Long.MAX_VALUE));
        prepareOffsetCommitRequestDisconnect(Collections.singletonMap(this.t1p, 100L));
        this.client.prepareResponse(groupCoordinatorResponse(this.node, Errors.NONE));
        prepareOffsetCommitRequest(Collections.singletonMap(this.t1p, 100L), Errors.NONE);
        this.coordinator.commitOffsetsSync(Collections.singletonMap(this.t1p, new OffsetAndMetadata(100L)), this.time.timer(Long.MAX_VALUE));
    }

    @Test
    public void testAsyncCommitCallbacksInvokedPriorToSyncCommitCompletion() throws Exception {
        this.client.prepareResponse(groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady(this.time.timer(Long.MAX_VALUE));
        final List synchronizedList = Collections.synchronizedList(new ArrayList());
        final OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(0L);
        final OffsetAndMetadata offsetAndMetadata2 = new OffsetAndMetadata(1L);
        this.coordinator.commitOffsetsAsync(Collections.singletonMap(this.t1p, offsetAndMetadata), new OffsetCommitCallback() { // from class: org.apache.kafka.clients.consumer.internals.ConsumerCoordinatorTest.13
            public void onComplete(Map<TopicPartition, OffsetAndMetadata> map, Exception exc) {
                synchronizedList.add(offsetAndMetadata);
            }
        });
        Thread thread = new Thread() { // from class: org.apache.kafka.clients.consumer.internals.ConsumerCoordinatorTest.14
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                ConsumerCoordinatorTest.this.coordinator.commitOffsetsSync(Collections.singletonMap(ConsumerCoordinatorTest.this.t1p, offsetAndMetadata2), ConsumerCoordinatorTest.this.time.timer(10000L));
                synchronizedList.add(offsetAndMetadata2);
            }
        };
        thread.start();
        this.client.waitForRequests(2, 5000L);
        respondToOffsetCommitRequest(Collections.singletonMap(this.t1p, Long.valueOf(offsetAndMetadata.offset())), Errors.NONE);
        respondToOffsetCommitRequest(Collections.singletonMap(this.t1p, Long.valueOf(offsetAndMetadata2.offset())), Errors.NONE);
        thread.join();
        Assertions.assertEquals(Arrays.asList(offsetAndMetadata, offsetAndMetadata2), synchronizedList);
    }

    @Test
    public void testRetryCommitUnknownTopicOrPartition() {
        this.client.prepareResponse(groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady(this.time.timer(Long.MAX_VALUE));
        this.client.prepareResponse(offsetCommitResponse(Collections.singletonMap(this.t1p, Errors.UNKNOWN_TOPIC_OR_PARTITION)));
        this.client.prepareResponse(offsetCommitResponse(Collections.singletonMap(this.t1p, Errors.NONE)));
        Assertions.assertTrue(this.coordinator.commitOffsetsSync(Collections.singletonMap(this.t1p, new OffsetAndMetadata(100L, "metadata")), this.time.timer(10000L)));
    }

    @Test
    public void testCommitOffsetMetadataTooLarge() {
        this.client.prepareResponse(groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady(this.time.timer(Long.MAX_VALUE));
        prepareOffsetCommitRequest(Collections.singletonMap(this.t1p, 100L), Errors.OFFSET_METADATA_TOO_LARGE);
        Assertions.assertThrows(OffsetMetadataTooLarge.class, () -> {
            this.coordinator.commitOffsetsSync(Collections.singletonMap(this.t1p, new OffsetAndMetadata(100L, "metadata")), this.time.timer(Long.MAX_VALUE));
        });
    }

    @Test
    public void testCommitOffsetIllegalGeneration() {
        this.client.prepareResponse(groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady(this.time.timer(Long.MAX_VALUE));
        prepareOffsetCommitRequest(Collections.singletonMap(this.t1p, 100L), Errors.ILLEGAL_GENERATION);
        Assertions.assertThrows(CommitFailedException.class, () -> {
            this.coordinator.commitOffsetsSync(Collections.singletonMap(this.t1p, new OffsetAndMetadata(100L, "metadata")), this.time.timer(Long.MAX_VALUE));
        });
    }

    @Test
    public void testCommitOffsetUnknownMemberId() {
        this.client.prepareResponse(groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady(this.time.timer(Long.MAX_VALUE));
        prepareOffsetCommitRequest(Collections.singletonMap(this.t1p, 100L), Errors.UNKNOWN_MEMBER_ID);
        Assertions.assertThrows(CommitFailedException.class, () -> {
            this.coordinator.commitOffsetsSync(Collections.singletonMap(this.t1p, new OffsetAndMetadata(100L, "metadata")), this.time.timer(Long.MAX_VALUE));
        });
    }

    @Test
    public void testCommitOffsetIllegalGenerationWithNewGeneration() {
        this.client.prepareResponse(groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady(this.time.timer(Long.MAX_VALUE));
        this.coordinator.setNewGeneration(new AbstractCoordinator.Generation(1, "memberId", (String) null));
        prepareOffsetCommitRequest(Collections.singletonMap(this.t1p, 100L), Errors.ILLEGAL_GENERATION);
        RequestFuture sendOffsetCommitRequest = this.coordinator.sendOffsetCommitRequest(Collections.singletonMap(this.t1p, new OffsetAndMetadata(100L, "metadata")));
        AbstractCoordinator.Generation generation = new AbstractCoordinator.Generation(2, "memberId-new", (String) null);
        this.coordinator.setNewGeneration(generation);
        this.coordinator.setNewState(AbstractCoordinator.MemberState.PREPARING_REBALANCE);
        Assertions.assertTrue(this.consumerClient.poll(sendOffsetCommitRequest, this.time.timer(30000L)));
        Assertions.assertTrue(sendOffsetCommitRequest.exception().getClass().isInstance(Errors.REBALANCE_IN_PROGRESS.exception()));
        Assertions.assertEquals(generation, this.coordinator.generation());
    }

    @Test
    public void testCommitOffsetIllegalGenerationShouldResetGenerationId() {
        this.subscriptions.subscribe(Collections.singleton("test1"), this.rebalanceListener);
        this.client.prepareResponse(groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady(this.time.timer(Long.MAX_VALUE));
        this.client.prepareResponse(joinGroupFollowerResponse(1, "consumer", "leader", Errors.NONE));
        this.client.prepareResponse(syncGroupResponse(Collections.emptyList(), Errors.NONE));
        this.coordinator.joinGroupIfNeeded(this.time.timer(Long.MAX_VALUE));
        prepareOffsetCommitRequest(Collections.singletonMap(this.t1p, 100L), Errors.ILLEGAL_GENERATION);
        Assertions.assertTrue(this.consumerClient.poll(this.coordinator.sendOffsetCommitRequest(Collections.singletonMap(this.t1p, new OffsetAndMetadata(100L, "metadata"))), this.time.timer(30000L)));
        Assertions.assertEquals(AbstractCoordinator.Generation.NO_GENERATION.generationId, this.coordinator.generation().generationId);
        Assertions.assertEquals(AbstractCoordinator.Generation.NO_GENERATION.protocolName, this.coordinator.generation().protocolName);
        Assertions.assertEquals("consumer", this.coordinator.generation().memberId);
    }

    @Test
    public void testCommitOffsetIllegalGenerationWithResetGeneration() {
        this.client.prepareResponse(groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady(this.time.timer(Long.MAX_VALUE));
        this.coordinator.setNewGeneration(new AbstractCoordinator.Generation(1, "memberId", (String) null));
        prepareOffsetCommitRequest(Collections.singletonMap(this.t1p, 100L), Errors.ILLEGAL_GENERATION);
        RequestFuture sendOffsetCommitRequest = this.coordinator.sendOffsetCommitRequest(Collections.singletonMap(this.t1p, new OffsetAndMetadata(100L, "metadata")));
        this.coordinator.setNewGeneration(AbstractCoordinator.Generation.NO_GENERATION);
        Assertions.assertTrue(this.consumerClient.poll(sendOffsetCommitRequest, this.time.timer(30000L)));
        Assertions.assertTrue(sendOffsetCommitRequest.exception().getClass().isInstance(new CommitFailedException()));
        Assertions.assertEquals(AbstractCoordinator.Generation.NO_GENERATION, this.coordinator.generation());
    }

    @Test
    public void testCommitOffsetUnknownMemberWithNewGeneration() {
        this.client.prepareResponse(groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady(this.time.timer(Long.MAX_VALUE));
        this.coordinator.setNewGeneration(new AbstractCoordinator.Generation(1, "memberId", (String) null));
        prepareOffsetCommitRequest(Collections.singletonMap(this.t1p, 100L), Errors.UNKNOWN_MEMBER_ID);
        RequestFuture sendOffsetCommitRequest = this.coordinator.sendOffsetCommitRequest(Collections.singletonMap(this.t1p, new OffsetAndMetadata(100L, "metadata")));
        AbstractCoordinator.Generation generation = new AbstractCoordinator.Generation(2, "memberId-new", (String) null);
        this.coordinator.setNewGeneration(generation);
        this.coordinator.setNewState(AbstractCoordinator.MemberState.PREPARING_REBALANCE);
        Assertions.assertTrue(this.consumerClient.poll(sendOffsetCommitRequest, this.time.timer(30000L)));
        Assertions.assertTrue(sendOffsetCommitRequest.exception().getClass().isInstance(Errors.REBALANCE_IN_PROGRESS.exception()));
        Assertions.assertEquals(generation, this.coordinator.generation());
    }

    @Test
    public void testCommitOffsetUnknownMemberWithResetGeneration() {
        this.client.prepareResponse(groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady(this.time.timer(Long.MAX_VALUE));
        this.coordinator.setNewGeneration(new AbstractCoordinator.Generation(1, "memberId", (String) null));
        prepareOffsetCommitRequest(Collections.singletonMap(this.t1p, 100L), Errors.UNKNOWN_MEMBER_ID);
        RequestFuture sendOffsetCommitRequest = this.coordinator.sendOffsetCommitRequest(Collections.singletonMap(this.t1p, new OffsetAndMetadata(100L, "metadata")));
        this.coordinator.setNewGeneration(AbstractCoordinator.Generation.NO_GENERATION);
        Assertions.assertTrue(this.consumerClient.poll(sendOffsetCommitRequest, this.time.timer(30000L)));
        Assertions.assertTrue(sendOffsetCommitRequest.exception().getClass().isInstance(new CommitFailedException()));
        Assertions.assertEquals(AbstractCoordinator.Generation.NO_GENERATION, this.coordinator.generation());
    }

    @Test
    public void testCommitOffsetUnknownMemberShouldResetToNoGeneration() {
        this.subscriptions.subscribe(Collections.singleton("test1"), this.rebalanceListener);
        this.client.prepareResponse(groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady(this.time.timer(Long.MAX_VALUE));
        this.client.prepareResponse(joinGroupFollowerResponse(1, "consumer", "leader", Errors.NONE));
        this.client.prepareResponse(syncGroupResponse(Collections.emptyList(), Errors.NONE));
        this.coordinator.joinGroupIfNeeded(this.time.timer(Long.MAX_VALUE));
        prepareOffsetCommitRequest(Collections.singletonMap(this.t1p, 100L), Errors.UNKNOWN_MEMBER_ID);
        Assertions.assertTrue(this.consumerClient.poll(this.coordinator.sendOffsetCommitRequest(Collections.singletonMap(this.t1p, new OffsetAndMetadata(100L, "metadata"))), this.time.timer(30000L)));
        Assertions.assertEquals(AbstractCoordinator.Generation.NO_GENERATION, this.coordinator.generation());
    }

    @Test
    public void testCommitOffsetFencedInstanceWithRebalancingGeneration() {
        this.client.prepareResponse(groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady(this.time.timer(Long.MAX_VALUE));
        this.coordinator.setNewGeneration(new AbstractCoordinator.Generation(1, "memberId", (String) null));
        this.coordinator.setNewState(AbstractCoordinator.MemberState.PREPARING_REBALANCE);
        prepareOffsetCommitRequest(Collections.singletonMap(this.t1p, 100L), Errors.FENCED_INSTANCE_ID);
        RequestFuture sendOffsetCommitRequest = this.coordinator.sendOffsetCommitRequest(Collections.singletonMap(this.t1p, new OffsetAndMetadata(100L, "metadata")));
        AbstractCoordinator.Generation generation = new AbstractCoordinator.Generation(2, "memberId-new", (String) null);
        this.coordinator.setNewGeneration(generation);
        Assertions.assertTrue(this.consumerClient.poll(sendOffsetCommitRequest, this.time.timer(30000L)));
        Assertions.assertTrue(sendOffsetCommitRequest.exception().getClass().isInstance(Errors.REBALANCE_IN_PROGRESS.exception()));
        Assertions.assertEquals(generation, this.coordinator.generation());
    }

    @Test
    public void testCommitOffsetFencedInstanceWithNewGeneration() {
        this.client.prepareResponse(groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady(this.time.timer(Long.MAX_VALUE));
        this.coordinator.setNewGeneration(new AbstractCoordinator.Generation(1, "memberId", (String) null));
        prepareOffsetCommitRequest(Collections.singletonMap(this.t1p, 100L), Errors.FENCED_INSTANCE_ID);
        RequestFuture sendOffsetCommitRequest = this.coordinator.sendOffsetCommitRequest(Collections.singletonMap(this.t1p, new OffsetAndMetadata(100L, "metadata")));
        AbstractCoordinator.Generation generation = new AbstractCoordinator.Generation(2, "memberId-new", (String) null);
        this.coordinator.setNewGeneration(generation);
        Assertions.assertTrue(this.consumerClient.poll(sendOffsetCommitRequest, this.time.timer(30000L)));
        Assertions.assertTrue(sendOffsetCommitRequest.exception().getClass().isInstance(new CommitFailedException()));
        Assertions.assertEquals(generation, this.coordinator.generation());
    }

    @Test
    public void testCommitOffsetShouldNotSetInstanceIdIfMemberIdIsUnknown() {
        this.rebalanceConfig = buildRebalanceConfig(this.groupInstanceId);
        ConsumerCoordinator buildCoordinator = buildCoordinator(this.rebalanceConfig, new Metrics(), this.assignors, false, this.subscriptions);
        this.client.prepareResponse(groupCoordinatorResponse(this.node, Errors.NONE));
        buildCoordinator.ensureCoordinatorReady(this.time.timer(5000L));
        this.client.prepareResponse(abstractRequest -> {
            OffsetCommitRequestData data = ((OffsetCommitRequest) abstractRequest).data();
            return data.groupInstanceId() == null && data.memberId().isEmpty();
        }, (AbstractResponse) offsetCommitResponse(Collections.emptyMap()));
        RequestFuture sendOffsetCommitRequest = buildCoordinator.sendOffsetCommitRequest(Collections.singletonMap(this.t1p, new OffsetAndMetadata(100L, "metadata")));
        Assertions.assertTrue(this.consumerClient.poll(sendOffsetCommitRequest, this.time.timer(5000L)));
        Assertions.assertFalse(sendOffsetCommitRequest.failed());
    }

    @Test
    public void testCommitOffsetRebalanceInProgress() {
        this.subscriptions.subscribe(Collections.singleton("test1"), this.rebalanceListener);
        this.client.updateMetadata(this.metadataResponse);
        this.client.prepareResponse(groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady(this.time.timer(Long.MAX_VALUE));
        Map<String, List<String>> singletonMap = Collections.singletonMap("leader", Collections.singletonList("test1"));
        this.partitionAssignor.prepare(Collections.singletonMap("leader", Collections.singletonList(this.t1p)));
        this.coordinator.ensureActiveGroup(this.time.timer(0L));
        Assertions.assertTrue(this.coordinator.rejoinNeededOrPending());
        Assertions.assertNull(this.coordinator.generationIfStable());
        Assertions.assertThrows(RebalanceInProgressException.class, () -> {
            this.coordinator.commitOffsetsSync(Collections.singletonMap(this.t1p, new OffsetAndMetadata(100L, "metadata")), this.time.timer(Long.MAX_VALUE));
        });
        this.client.respondFrom(joinGroupLeaderResponse(1, "leader", singletonMap, Errors.NONE), new Node(Integer.MAX_VALUE - this.node.id(), this.node.host(), this.node.port()));
        this.client.prepareResponse(abstractRequest -> {
            SyncGroupRequest syncGroupRequest = (SyncGroupRequest) abstractRequest;
            return syncGroupRequest.data().memberId().equals("leader") && syncGroupRequest.data().generationId() == 1 && syncGroupRequest.groupAssignments().containsKey("leader");
        }, (AbstractResponse) syncGroupResponse(Collections.singletonList(this.t1p), Errors.NONE));
        this.coordinator.poll(this.time.timer(Long.MAX_VALUE));
        AbstractCoordinator.Generation generation = new AbstractCoordinator.Generation(1, "leader", this.partitionAssignor.name());
        Assertions.assertFalse(this.coordinator.rejoinNeededOrPending());
        Assertions.assertEquals(generation, this.coordinator.generationIfStable());
        prepareOffsetCommitRequest(Collections.singletonMap(this.t1p, 100L), Errors.REBALANCE_IN_PROGRESS);
        Assertions.assertThrows(RebalanceInProgressException.class, () -> {
            this.coordinator.commitOffsetsSync(Collections.singletonMap(this.t1p, new OffsetAndMetadata(100L, "metadata")), this.time.timer(Long.MAX_VALUE));
        });
        Assertions.assertTrue(this.coordinator.rejoinNeededOrPending());
        Assertions.assertEquals(generation, this.coordinator.generationIfStable());
    }

    @Test
    public void testCommitOffsetSyncCallbackWithNonRetriableException() {
        this.client.prepareResponse(groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady(this.time.timer(Long.MAX_VALUE));
        prepareOffsetCommitRequest(Collections.singletonMap(this.t1p, 100L), Errors.UNKNOWN_SERVER_ERROR);
        Assertions.assertThrows(KafkaException.class, () -> {
            this.coordinator.commitOffsetsSync(Collections.singletonMap(this.t1p, new OffsetAndMetadata(100L)), this.time.timer(Long.MAX_VALUE));
        });
    }

    @Test
    public void testCommitOffsetSyncWithoutFutureGetsCompleted() {
        this.client.prepareResponse(groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady(this.time.timer(Long.MAX_VALUE));
        Assertions.assertFalse(this.coordinator.commitOffsetsSync(Collections.singletonMap(this.t1p, new OffsetAndMetadata(100L)), this.time.timer(0L)));
    }

    @Test
    public void testRefreshOffset() {
        this.client.prepareResponse(groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady(this.time.timer(Long.MAX_VALUE));
        this.subscriptions.assignFromUser(Collections.singleton(this.t1p));
        this.client.prepareResponse(offsetFetchResponse(this.t1p, Errors.NONE, "", 100L));
        this.coordinator.refreshCommittedOffsetsIfNeeded(this.time.timer(Long.MAX_VALUE));
        Assertions.assertEquals(Collections.emptySet(), this.subscriptions.initializingPartitions());
        Assertions.assertTrue(this.subscriptions.hasAllFetchPositions());
        Assertions.assertEquals(100L, this.subscriptions.position(this.t1p).offset);
    }

    @Test
    public void testRefreshOffsetWithValidation() {
        this.client.prepareResponse(groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady(this.time.timer(Long.MAX_VALUE));
        this.subscriptions.assignFromUser(Collections.singleton(this.t1p));
        this.client.updateMetadata(RequestTestUtils.metadataUpdateWith("kafka-cluster", 1, (Map<String, Errors>) Collections.emptyMap(), (Map<String, Integer>) Collections.singletonMap("test1", 1), (Function<TopicPartition, Integer>) topicPartition -> {
            return 4;
        }));
        this.client.prepareResponse(offsetFetchResponse(this.t1p, Errors.NONE, "", 100L, Optional.of(3)));
        this.coordinator.refreshCommittedOffsetsIfNeeded(this.time.timer(Long.MAX_VALUE));
        Assertions.assertEquals(Collections.emptySet(), this.subscriptions.initializingPartitions());
        Assertions.assertFalse(this.subscriptions.hasAllFetchPositions());
        Assertions.assertTrue(this.subscriptions.awaitingValidation(this.t1p));
        Assertions.assertEquals(this.subscriptions.position(this.t1p).offset, 100L);
        Assertions.assertNull(this.subscriptions.validPosition(this.t1p));
    }

    @Test
    public void testFetchCommittedOffsets() {
        this.client.prepareResponse(groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady(this.time.timer(Long.MAX_VALUE));
        Optional of = Optional.of(15);
        this.client.prepareResponse(offsetFetchResponse(Errors.NONE, Collections.singletonMap(this.t1p, new OffsetFetchResponse.PartitionData(500L, of, "blahblah", Errors.NONE))));
        Map fetchCommittedOffsets = this.coordinator.fetchCommittedOffsets(Collections.singleton(this.t1p), this.time.timer(Long.MAX_VALUE));
        Assertions.assertNotNull(fetchCommittedOffsets);
        Assertions.assertEquals(new OffsetAndMetadata(500L, of, "blahblah"), fetchCommittedOffsets.get(this.t1p));
    }

    @Test
    public void testTopicAuthorizationFailedInOffsetFetch() {
        this.client.prepareResponse(groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady(this.time.timer(Long.MAX_VALUE));
        this.client.prepareResponse(offsetFetchResponse(Errors.NONE, Collections.singletonMap(this.t1p, new OffsetFetchResponse.PartitionData(-1L, Optional.empty(), "", Errors.TOPIC_AUTHORIZATION_FAILED))));
        Assertions.assertEquals(Collections.singleton("test1"), Assertions.assertThrows(TopicAuthorizationException.class, () -> {
            this.coordinator.fetchCommittedOffsets(Collections.singleton(this.t1p), this.time.timer(Long.MAX_VALUE));
        }).unauthorizedTopics());
    }

    @Test
    public void testRefreshOffsetLoadInProgress() {
        this.client.prepareResponse(groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady(this.time.timer(Long.MAX_VALUE));
        this.subscriptions.assignFromUser(Collections.singleton(this.t1p));
        this.client.prepareResponse(offsetFetchResponse(Errors.COORDINATOR_LOAD_IN_PROGRESS, Collections.emptyMap()));
        this.client.prepareResponse(offsetFetchResponse(this.t1p, Errors.NONE, "", 100L));
        this.coordinator.refreshCommittedOffsetsIfNeeded(this.time.timer(Long.MAX_VALUE));
        Assertions.assertEquals(Collections.emptySet(), this.subscriptions.initializingPartitions());
        Assertions.assertTrue(this.subscriptions.hasAllFetchPositions());
        Assertions.assertEquals(100L, this.subscriptions.position(this.t1p).offset);
    }

    @Test
    public void testRefreshOffsetsGroupNotAuthorized() {
        this.client.prepareResponse(groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady(this.time.timer(Long.MAX_VALUE));
        this.subscriptions.assignFromUser(Collections.singleton(this.t1p));
        this.client.prepareResponse(offsetFetchResponse(Errors.GROUP_AUTHORIZATION_FAILED, Collections.emptyMap()));
        try {
            this.coordinator.refreshCommittedOffsetsIfNeeded(this.time.timer(Long.MAX_VALUE));
            Assertions.fail("Expected group authorization error");
        } catch (GroupAuthorizationException e) {
            Assertions.assertEquals("test-group", e.groupId());
        }
    }

    @Test
    public void testRefreshOffsetWithPendingTransactions() {
        this.client.prepareResponse(groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady(this.time.timer(Long.MAX_VALUE));
        this.subscriptions.assignFromUser(Collections.singleton(this.t1p));
        this.client.prepareResponse(offsetFetchResponse(this.t1p, Errors.UNSTABLE_OFFSET_COMMIT, "", -1L));
        this.client.prepareResponse(offsetFetchResponse(this.t1p, Errors.NONE, "", 100L));
        Assertions.assertEquals(Collections.singleton(this.t1p), this.subscriptions.initializingPartitions());
        this.coordinator.refreshCommittedOffsetsIfNeeded(this.time.timer(0L));
        Assertions.assertEquals(Collections.singleton(this.t1p), this.subscriptions.initializingPartitions());
        this.coordinator.refreshCommittedOffsetsIfNeeded(this.time.timer(0L));
        Assertions.assertEquals(Collections.emptySet(), this.subscriptions.initializingPartitions());
        Assertions.assertTrue(this.subscriptions.hasAllFetchPositions());
        Assertions.assertEquals(100L, this.subscriptions.position(this.t1p).offset);
    }

    @Test
    public void testRefreshOffsetUnknownTopicOrPartition() {
        this.client.prepareResponse(groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady(this.time.timer(Long.MAX_VALUE));
        this.subscriptions.assignFromUser(Collections.singleton(this.t1p));
        this.client.prepareResponse(offsetFetchResponse(this.t1p, Errors.UNKNOWN_TOPIC_OR_PARTITION, "", 100L));
        Assertions.assertThrows(KafkaException.class, () -> {
            this.coordinator.refreshCommittedOffsetsIfNeeded(this.time.timer(Long.MAX_VALUE));
        });
    }

    @Test
    public void testRefreshOffsetNotCoordinatorForConsumer() {
        this.client.prepareResponse(groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady(this.time.timer(Long.MAX_VALUE));
        this.subscriptions.assignFromUser(Collections.singleton(this.t1p));
        this.client.prepareResponse(offsetFetchResponse(Errors.NOT_COORDINATOR, Collections.emptyMap()));
        this.client.prepareResponse(groupCoordinatorResponse(this.node, Errors.NONE));
        this.client.prepareResponse(offsetFetchResponse(this.t1p, Errors.NONE, "", 100L));
        this.coordinator.refreshCommittedOffsetsIfNeeded(this.time.timer(Long.MAX_VALUE));
        Assertions.assertEquals(Collections.emptySet(), this.subscriptions.initializingPartitions());
        Assertions.assertTrue(this.subscriptions.hasAllFetchPositions());
        Assertions.assertEquals(100L, this.subscriptions.position(this.t1p).offset);
    }

    @Test
    public void testRefreshOffsetWithNoFetchableOffsets() {
        this.client.prepareResponse(groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady(this.time.timer(Long.MAX_VALUE));
        this.subscriptions.assignFromUser(Collections.singleton(this.t1p));
        this.client.prepareResponse(offsetFetchResponse(this.t1p, Errors.NONE, "", -1L));
        this.coordinator.refreshCommittedOffsetsIfNeeded(this.time.timer(Long.MAX_VALUE));
        Assertions.assertEquals(Collections.singleton(this.t1p), this.subscriptions.initializingPartitions());
        Assertions.assertEquals(Collections.emptySet(), this.subscriptions.partitionsNeedingReset(this.time.milliseconds()));
        Assertions.assertFalse(this.subscriptions.hasAllFetchPositions());
        Assertions.assertNull(this.subscriptions.position(this.t1p));
    }

    @Test
    public void testNoCoordinatorDiscoveryIfPositionsKnown() {
        Assertions.assertTrue(this.coordinator.coordinatorUnknown());
        this.subscriptions.assignFromUser(Collections.singleton(this.t1p));
        this.subscriptions.seek(this.t1p, 500L);
        this.coordinator.refreshCommittedOffsetsIfNeeded(this.time.timer(Long.MAX_VALUE));
        Assertions.assertEquals(Collections.emptySet(), this.subscriptions.initializingPartitions());
        Assertions.assertTrue(this.subscriptions.hasAllFetchPositions());
        Assertions.assertEquals(500L, this.subscriptions.position(this.t1p).offset);
        Assertions.assertTrue(this.coordinator.coordinatorUnknown());
    }

    @Test
    public void testNoCoordinatorDiscoveryIfPartitionAwaitingReset() {
        Assertions.assertTrue(this.coordinator.coordinatorUnknown());
        this.subscriptions.assignFromUser(Collections.singleton(this.t1p));
        this.subscriptions.requestOffsetReset(this.t1p, OffsetResetStrategy.EARLIEST);
        this.coordinator.refreshCommittedOffsetsIfNeeded(this.time.timer(Long.MAX_VALUE));
        Assertions.assertEquals(Collections.emptySet(), this.subscriptions.initializingPartitions());
        Assertions.assertFalse(this.subscriptions.hasAllFetchPositions());
        Assertions.assertEquals(Collections.singleton(this.t1p), this.subscriptions.partitionsNeedingReset(this.time.milliseconds()));
        Assertions.assertEquals(OffsetResetStrategy.EARLIEST, this.subscriptions.resetStrategy(this.t1p));
        Assertions.assertTrue(this.coordinator.coordinatorUnknown());
    }

    @Test
    public void testAuthenticationFailureInEnsureActiveGroup() {
        this.client.createPendingAuthenticationError(this.node, 300L);
        try {
            this.coordinator.ensureActiveGroup();
            Assertions.fail("Expected an authentication error.");
        } catch (AuthenticationException e) {
        }
    }

    @Test
    public void testThreadSafeAssignedPartitionsMetric() throws Exception {
        final KafkaMetric metric = this.metrics.metric(new MetricName("assigned-partitions", "consumertest-group-coordinator-metrics", "", Collections.emptyMap()));
        final AtomicBoolean atomicBoolean = new AtomicBoolean();
        final AtomicReference atomicReference = new AtomicReference();
        final AtomicInteger atomicInteger = new AtomicInteger();
        Thread thread = new Thread() { // from class: org.apache.kafka.clients.consumer.internals.ConsumerCoordinatorTest.15
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                while (!atomicBoolean.get()) {
                    try {
                        atomicInteger.set(((Double) metric.metricValue()).intValue());
                    } catch (Exception e) {
                        atomicReference.set(e);
                        return;
                    }
                }
            }
        };
        thread.start();
        this.client.prepareResponse(groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady(this.time.timer(Long.MAX_VALUE));
        HashSet hashSet = new HashSet();
        int i = 10;
        for (int i2 = 0; i2 < 10; i2++) {
            hashSet.add(new TopicPartition("test1", i2));
            this.subscriptions.assignFromUser(hashSet);
        }
        TestUtils.waitForCondition(() -> {
            return atomicInteger.get() == i || atomicReference.get() != null;
        }, "Failed to observe expected assignment change");
        atomicBoolean.set(true);
        thread.join();
        Assertions.assertNull(atomicReference.get(), "Failed fetching the metric at least once");
    }

    @Test
    public void testCloseDynamicAssignment() {
        ConsumerCoordinator prepareCoordinatorForCloseTest = prepareCoordinatorForCloseTest(true, true, Optional.empty(), true);
        try {
            gracefulCloseTest(prepareCoordinatorForCloseTest, true);
            if (prepareCoordinatorForCloseTest != null) {
                prepareCoordinatorForCloseTest.close();
            }
        } catch (Throwable th) {
            if (prepareCoordinatorForCloseTest != null) {
                try {
                    prepareCoordinatorForCloseTest.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    public void testCloseManualAssignment() {
        ConsumerCoordinator prepareCoordinatorForCloseTest = prepareCoordinatorForCloseTest(false, true, Optional.empty(), true);
        try {
            gracefulCloseTest(prepareCoordinatorForCloseTest, false);
            if (prepareCoordinatorForCloseTest != null) {
                prepareCoordinatorForCloseTest.close();
            }
        } catch (Throwable th) {
            if (prepareCoordinatorForCloseTest != null) {
                try {
                    prepareCoordinatorForCloseTest.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    public void testCloseCoordinatorNotKnownManualAssignment() throws Exception {
        ConsumerCoordinator prepareCoordinatorForCloseTest = prepareCoordinatorForCloseTest(false, true, Optional.empty(), true);
        try {
            makeCoordinatorUnknown(prepareCoordinatorForCloseTest, Errors.NOT_COORDINATOR);
            this.time.sleep(2000L);
            closeVerifyTimeout(prepareCoordinatorForCloseTest, 1000L, 1000L, 1000L);
            if (prepareCoordinatorForCloseTest != null) {
                prepareCoordinatorForCloseTest.close();
            }
        } catch (Throwable th) {
            if (prepareCoordinatorForCloseTest != null) {
                try {
                    prepareCoordinatorForCloseTest.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    public void testCloseCoordinatorNotKnownNoCommits() throws Exception {
        ConsumerCoordinator prepareCoordinatorForCloseTest = prepareCoordinatorForCloseTest(true, false, Optional.empty(), true);
        try {
            makeCoordinatorUnknown(prepareCoordinatorForCloseTest, Errors.NOT_COORDINATOR);
            closeVerifyTimeout(prepareCoordinatorForCloseTest, 1000L, 0L, 0L);
            if (prepareCoordinatorForCloseTest != null) {
                prepareCoordinatorForCloseTest.close();
            }
        } catch (Throwable th) {
            if (prepareCoordinatorForCloseTest != null) {
                try {
                    prepareCoordinatorForCloseTest.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    public void testCloseCoordinatorNotKnownWithCommits() throws Exception {
        ConsumerCoordinator prepareCoordinatorForCloseTest = prepareCoordinatorForCloseTest(true, true, Optional.empty(), true);
        try {
            makeCoordinatorUnknown(prepareCoordinatorForCloseTest, Errors.NOT_COORDINATOR);
            this.time.sleep(2000L);
            closeVerifyTimeout(prepareCoordinatorForCloseTest, 1000L, 1000L, 1000L);
            if (prepareCoordinatorForCloseTest != null) {
                prepareCoordinatorForCloseTest.close();
            }
        } catch (Throwable th) {
            if (prepareCoordinatorForCloseTest != null) {
                try {
                    prepareCoordinatorForCloseTest.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    public void testCloseCoordinatorUnavailableNoCommits() throws Exception {
        ConsumerCoordinator prepareCoordinatorForCloseTest = prepareCoordinatorForCloseTest(true, false, Optional.empty(), true);
        try {
            makeCoordinatorUnknown(prepareCoordinatorForCloseTest, Errors.COORDINATOR_NOT_AVAILABLE);
            closeVerifyTimeout(prepareCoordinatorForCloseTest, 1000L, 0L, 0L);
            if (prepareCoordinatorForCloseTest != null) {
                prepareCoordinatorForCloseTest.close();
            }
        } catch (Throwable th) {
            if (prepareCoordinatorForCloseTest != null) {
                try {
                    prepareCoordinatorForCloseTest.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    public void testCloseTimeoutCoordinatorUnavailableForCommit() throws Exception {
        ConsumerCoordinator prepareCoordinatorForCloseTest = prepareCoordinatorForCloseTest(true, true, this.groupInstanceId, true);
        try {
            makeCoordinatorUnknown(prepareCoordinatorForCloseTest, Errors.COORDINATOR_NOT_AVAILABLE);
            this.time.sleep(2000L);
            closeVerifyTimeout(prepareCoordinatorForCloseTest, 1000L, 1000L, 1000L);
            if (prepareCoordinatorForCloseTest != null) {
                prepareCoordinatorForCloseTest.close();
            }
        } catch (Throwable th) {
            if (prepareCoordinatorForCloseTest != null) {
                try {
                    prepareCoordinatorForCloseTest.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    public void testCloseMaxWaitCoordinatorUnavailableForCommit() throws Exception {
        ConsumerCoordinator prepareCoordinatorForCloseTest = prepareCoordinatorForCloseTest(true, true, this.groupInstanceId, true);
        try {
            makeCoordinatorUnknown(prepareCoordinatorForCloseTest, Errors.COORDINATOR_NOT_AVAILABLE);
            this.time.sleep(2000L);
            closeVerifyTimeout(prepareCoordinatorForCloseTest, Long.MAX_VALUE, 30000L, 30000L);
            if (prepareCoordinatorForCloseTest != null) {
                prepareCoordinatorForCloseTest.close();
            }
        } catch (Throwable th) {
            if (prepareCoordinatorForCloseTest != null) {
                try {
                    prepareCoordinatorForCloseTest.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    public void testCloseNoResponseForCommit() throws Exception {
        ConsumerCoordinator prepareCoordinatorForCloseTest = prepareCoordinatorForCloseTest(true, true, this.groupInstanceId, true);
        try {
            this.time.sleep(2000L);
            closeVerifyTimeout(prepareCoordinatorForCloseTest, Long.MAX_VALUE, 30000L, 30000L);
            if (prepareCoordinatorForCloseTest != null) {
                prepareCoordinatorForCloseTest.close();
            }
        } catch (Throwable th) {
            if (prepareCoordinatorForCloseTest != null) {
                try {
                    prepareCoordinatorForCloseTest.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    public void testCloseNoResponseForLeaveGroup() throws Exception {
        ConsumerCoordinator prepareCoordinatorForCloseTest = prepareCoordinatorForCloseTest(true, false, Optional.empty(), true);
        try {
            closeVerifyTimeout(prepareCoordinatorForCloseTest, Long.MAX_VALUE, 30000L, 30000L);
            if (prepareCoordinatorForCloseTest != null) {
                prepareCoordinatorForCloseTest.close();
            }
        } catch (Throwable th) {
            if (prepareCoordinatorForCloseTest != null) {
                try {
                    prepareCoordinatorForCloseTest.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    public void testCloseNoWait() throws Exception {
        ConsumerCoordinator prepareCoordinatorForCloseTest = prepareCoordinatorForCloseTest(true, true, this.groupInstanceId, true);
        try {
            this.time.sleep(2000L);
            closeVerifyTimeout(prepareCoordinatorForCloseTest, 0L, 0L, 0L);
            if (prepareCoordinatorForCloseTest != null) {
                prepareCoordinatorForCloseTest.close();
            }
        } catch (Throwable th) {
            if (prepareCoordinatorForCloseTest != null) {
                try {
                    prepareCoordinatorForCloseTest.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    public void testHeartbeatThreadClose() throws Exception {
        ConsumerCoordinator prepareCoordinatorForCloseTest = prepareCoordinatorForCloseTest(true, true, this.groupInstanceId, true);
        try {
            prepareCoordinatorForCloseTest.ensureActiveGroup();
            this.time.sleep(5100L);
            Thread.yield();
            closeVerifyTimeout(prepareCoordinatorForCloseTest, Long.MAX_VALUE, 30000L, 30000L);
            Thread[] threadArr = new Thread[Thread.activeCount()];
            int enumerate = Thread.enumerate(threadArr);
            for (int i = 0; i < enumerate; i++) {
                Assertions.assertFalse(threadArr[i].getName().contains("test-group"), "Heartbeat thread active after close");
            }
            if (prepareCoordinatorForCloseTest != null) {
                prepareCoordinatorForCloseTest.close();
            }
        } catch (Throwable th) {
            if (prepareCoordinatorForCloseTest != null) {
                try {
                    prepareCoordinatorForCloseTest.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    public void testAutoCommitAfterCoordinatorBackToService() {
        ConsumerCoordinator buildCoordinator = buildCoordinator(this.rebalanceConfig, new Metrics(), this.assignors, true, this.subscriptions);
        try {
            this.subscriptions.assignFromUser(Collections.singleton(this.t1p));
            this.subscriptions.seek(this.t1p, 100L);
            buildCoordinator.markCoordinatorUnknown("test cause");
            Assertions.assertTrue(buildCoordinator.coordinatorUnknown());
            this.client.prepareResponse(groupCoordinatorResponse(this.node, Errors.NONE));
            prepareOffsetCommitRequest(Collections.singletonMap(this.t1p, 100L), Errors.NONE);
            this.time.sleep(2000L);
            buildCoordinator.maybeAutoCommitOffsetsAsync(this.time.milliseconds());
            Assertions.assertFalse(buildCoordinator.coordinatorUnknown());
            Assertions.assertEquals(100L, this.subscriptions.position(this.t1p).offset);
            if (buildCoordinator != null) {
                buildCoordinator.close();
            }
        } catch (Throwable th) {
            if (buildCoordinator != null) {
                try {
                    buildCoordinator.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    public void testCommitOffsetRequestSyncWithFencedInstanceIdException() {
        this.client.prepareResponse(groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady(this.time.timer(Long.MAX_VALUE));
        prepareOffsetCommitRequest(Collections.singletonMap(this.t1p, 100L), Errors.FENCED_INSTANCE_ID);
        Assertions.assertThrows(FencedInstanceIdException.class, () -> {
            this.coordinator.commitOffsetsSync(Collections.singletonMap(this.t1p, new OffsetAndMetadata(100L)), this.time.timer(Long.MAX_VALUE));
        });
    }

    @Test
    public void testCommitOffsetRequestAsyncWithFencedInstanceIdException() {
        Assertions.assertThrows(FencedInstanceIdException.class, this::receiveFencedInstanceIdException);
    }

    @Test
    public void testCommitOffsetRequestAsyncAlwaysReceiveFencedException() {
        Assertions.assertThrows(FencedInstanceIdException.class, this::receiveFencedInstanceIdException);
        Assertions.assertThrows(FencedInstanceIdException.class, () -> {
            this.coordinator.commitOffsetsAsync(Collections.singletonMap(this.t1p, new OffsetAndMetadata(100L)), new MockCommitCallback());
        });
        Assertions.assertThrows(FencedInstanceIdException.class, () -> {
            this.coordinator.commitOffsetsSync(Collections.singletonMap(this.t1p, new OffsetAndMetadata(100L)), this.time.timer(Long.MAX_VALUE));
        });
    }

    @Test
    public void testGetGroupMetadata() {
        ConsumerGroupMetadata groupMetadata = this.coordinator.groupMetadata();
        Assertions.assertNotNull(groupMetadata);
        Assertions.assertEquals("test-group", groupMetadata.groupId());
        Assertions.assertEquals(-1, groupMetadata.generationId());
        Assertions.assertEquals("", groupMetadata.memberId());
        Assertions.assertFalse(groupMetadata.groupInstanceId().isPresent());
        ConsumerCoordinator prepareCoordinatorForCloseTest = prepareCoordinatorForCloseTest(true, true, this.groupInstanceId, true);
        try {
            prepareCoordinatorForCloseTest.ensureActiveGroup();
            ConsumerGroupMetadata groupMetadata2 = prepareCoordinatorForCloseTest.groupMetadata();
            Assertions.assertNotNull(groupMetadata2);
            Assertions.assertEquals("test-group", groupMetadata2.groupId());
            Assertions.assertEquals(1, groupMetadata2.generationId());
            Assertions.assertEquals("consumer", groupMetadata2.memberId());
            Assertions.assertEquals(this.groupInstanceId, groupMetadata2.groupInstanceId());
            if (prepareCoordinatorForCloseTest != null) {
                prepareCoordinatorForCloseTest.close();
            }
        } catch (Throwable th) {
            if (prepareCoordinatorForCloseTest != null) {
                try {
                    prepareCoordinatorForCloseTest.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    public void shouldUpdateConsumerGroupMetadataBeforeCallbacks() {
        this.subscriptions.subscribe(Collections.singleton("test1"), new MockRebalanceListener() { // from class: org.apache.kafka.clients.consumer.internals.ConsumerCoordinatorTest.16
            @Override // org.apache.kafka.clients.consumer.internals.MockRebalanceListener
            public void onPartitionsRevoked(Collection<TopicPartition> collection) {
                Assertions.assertEquals(2, ConsumerCoordinatorTest.this.coordinator.groupMetadata().generationId());
            }
        });
        this.coordinator.onJoinComplete(1, "memberId", this.partitionAssignor.name(), ConsumerProtocol.serializeAssignment(new ConsumerPartitionAssignor.Assignment(Collections.singletonList(this.t1p), ByteBuffer.wrap(new byte[0]))));
        this.coordinator.onJoinComplete(2, "memberId", this.partitionAssignor.name(), ConsumerProtocol.serializeAssignment(new ConsumerPartitionAssignor.Assignment(Collections.emptyList(), ByteBuffer.wrap(new byte[0]))));
    }

    @Test
    public void testPrepareJoinAndRejoinAfterFailedRebalance() {
        List<TopicPartition> singletonList = Collections.singletonList(this.t1p);
        ConsumerCoordinator prepareCoordinatorForCloseTest = prepareCoordinatorForCloseTest(true, false, Optional.of("group-id"), true);
        try {
            prepareCoordinatorForCloseTest.ensureActiveGroup();
            prepareOffsetCommitRequest(Collections.singletonMap(this.t1p, 100L), Errors.REBALANCE_IN_PROGRESS);
            Assertions.assertThrows(RebalanceInProgressException.class, () -> {
                prepareCoordinatorForCloseTest.commitOffsetsSync(Collections.singletonMap(this.t1p, new OffsetAndMetadata(100L)), this.time.timer(Long.MAX_VALUE));
            });
            Assertions.assertFalse(this.client.hasPendingResponses());
            Assertions.assertFalse(this.client.hasInFlightRequests());
            this.client.prepareResponse(joinGroupFollowerResponse(42, "consumer-42", "leader", Errors.NONE));
            MockTime mockTime = new MockTime(1L);
            Assertions.assertFalse(prepareCoordinatorForCloseTest.joinGroupIfNeeded(mockTime.timer(100L)));
            Assertions.assertFalse(this.client.hasPendingResponses());
            Assertions.assertEquals(1, this.client.inFlightRequestCount());
            Assertions.assertEquals(42, prepareCoordinatorForCloseTest.generation().generationId);
            Assertions.assertEquals("consumer-42", prepareCoordinatorForCloseTest.generation().memberId);
            prepareCoordinatorForCloseTest.maybeLeaveGroup("Clear generation data.");
            Assertions.assertEquals(AbstractCoordinator.Generation.NO_GENERATION, prepareCoordinatorForCloseTest.generation());
            this.client.respond(syncGroupResponse(singletonList, Errors.NONE));
            Assertions.assertFalse(prepareCoordinatorForCloseTest.joinGroupIfNeeded(mockTime.timer(1L)));
            Assertions.assertFalse(this.client.hasPendingResponses());
            Assertions.assertEquals(1, this.client.inFlightRequestCount());
            System.out.println(this.client.requests());
            this.client.respond(joinGroupFollowerResponse(42, "consumer-42", "leader", Errors.NONE));
            this.client.prepareResponse(syncGroupResponse(singletonList, Errors.NONE));
            Assertions.assertTrue(prepareCoordinatorForCloseTest.joinGroupIfNeeded(mockTime.timer(3000L)));
            Assertions.assertFalse(this.client.hasPendingResponses());
            Assertions.assertFalse(this.client.hasInFlightRequests());
            if (prepareCoordinatorForCloseTest != null) {
                prepareCoordinatorForCloseTest.close();
            }
            Collection<TopicPartition> lost = getLost(singletonList);
            Assertions.assertEquals(lost.isEmpty() ? null : lost, this.rebalanceListener.lost);
            Assertions.assertEquals(lost.size(), this.rebalanceListener.lostCount);
        } catch (Throwable th) {
            if (prepareCoordinatorForCloseTest != null) {
                try {
                    prepareCoordinatorForCloseTest.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    public void shouldLoseAllOwnedPartitionsBeforeRejoiningAfterDroppingOutOfTheGroup() {
        List<TopicPartition> singletonList = Collections.singletonList(this.t1p);
        ConsumerCoordinator prepareCoordinatorForCloseTest = prepareCoordinatorForCloseTest(true, false, Optional.of("group-id"), true);
        try {
            SystemTime systemTime = new SystemTime();
            prepareCoordinatorForCloseTest.ensureActiveGroup();
            prepareOffsetCommitRequest(Collections.singletonMap(this.t1p, 100L), Errors.REBALANCE_IN_PROGRESS);
            Assertions.assertThrows(RebalanceInProgressException.class, () -> {
                prepareCoordinatorForCloseTest.commitOffsetsSync(Collections.singletonMap(this.t1p, new OffsetAndMetadata(100L)), this.time.timer(Long.MAX_VALUE));
            });
            this.client.prepareResponse(joinGroupFollowerResponse(42, "consumer-42", "leader", Errors.NONE));
            this.client.prepareResponse(syncGroupResponse(Collections.emptyList(), Errors.UNKNOWN_MEMBER_ID));
            Assertions.assertFalse(prepareCoordinatorForCloseTest.joinGroupIfNeeded(systemTime.timer(1000L)));
            Assertions.assertEquals(AbstractCoordinator.Generation.NO_GENERATION, prepareCoordinatorForCloseTest.generation());
            Assertions.assertEquals("", prepareCoordinatorForCloseTest.generation().memberId);
            Assertions.assertFalse(prepareCoordinatorForCloseTest.joinGroupIfNeeded(systemTime.timer(1000L)));
            if (prepareCoordinatorForCloseTest != null) {
                prepareCoordinatorForCloseTest.close();
            }
            Collection<TopicPartition> lost = getLost(singletonList);
            Assertions.assertEquals(lost.isEmpty() ? 0 : 1, this.rebalanceListener.lostCount);
            Assertions.assertEquals(lost.isEmpty() ? null : lost, this.rebalanceListener.lost);
        } catch (Throwable th) {
            if (prepareCoordinatorForCloseTest != null) {
                try {
                    prepareCoordinatorForCloseTest.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    public void shouldLoseAllOwnedPartitionsBeforeRejoiningAfterResettingGenerationId() {
        List<TopicPartition> singletonList = Collections.singletonList(this.t1p);
        ConsumerCoordinator prepareCoordinatorForCloseTest = prepareCoordinatorForCloseTest(true, false, Optional.of("group-id"), true);
        try {
            SystemTime systemTime = new SystemTime();
            prepareCoordinatorForCloseTest.ensureActiveGroup();
            prepareOffsetCommitRequest(Collections.singletonMap(this.t1p, 100L), Errors.REBALANCE_IN_PROGRESS);
            Assertions.assertThrows(RebalanceInProgressException.class, () -> {
                prepareCoordinatorForCloseTest.commitOffsetsSync(Collections.singletonMap(this.t1p, new OffsetAndMetadata(100L)), this.time.timer(Long.MAX_VALUE));
            });
            this.client.prepareResponse(joinGroupFollowerResponse(42, "consumer-42", "leader", Errors.NONE));
            this.client.prepareResponse(syncGroupResponse(Collections.emptyList(), Errors.ILLEGAL_GENERATION));
            Assertions.assertFalse(prepareCoordinatorForCloseTest.joinGroupIfNeeded(systemTime.timer(1000L)));
            Assertions.assertEquals(AbstractCoordinator.Generation.NO_GENERATION.generationId, prepareCoordinatorForCloseTest.generation().generationId);
            Assertions.assertEquals(AbstractCoordinator.Generation.NO_GENERATION.protocolName, prepareCoordinatorForCloseTest.generation().protocolName);
            Assertions.assertEquals("consumer-42", prepareCoordinatorForCloseTest.generation().memberId);
            Assertions.assertFalse(prepareCoordinatorForCloseTest.joinGroupIfNeeded(systemTime.timer(1000L)));
            if (prepareCoordinatorForCloseTest != null) {
                prepareCoordinatorForCloseTest.close();
            }
            Collection<TopicPartition> lost = getLost(singletonList);
            Assertions.assertEquals(lost.isEmpty() ? 0 : 1, this.rebalanceListener.lostCount);
            Assertions.assertEquals(lost.isEmpty() ? null : lost, this.rebalanceListener.lost);
        } catch (Throwable th) {
            if (prepareCoordinatorForCloseTest != null) {
                try {
                    prepareCoordinatorForCloseTest.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    public void testSubscriptionRackId() {
        this.metrics.close();
        this.coordinator.close(this.time.timer(0L));
        this.metrics = new Metrics(this.time);
        RackAwareAssignor rackAwareAssignor = new RackAwareAssignor();
        this.coordinator = new ConsumerCoordinator(this.rebalanceConfig, new LogContext(), this.consumerClient, Collections.singletonList(rackAwareAssignor), this.metadata, this.subscriptions, this.metrics, "consumertest-group", this.time, false, 2000, (ConsumerInterceptors) null, false, "rack-a");
        this.subscriptions.subscribe(Collections.singleton("test1"), this.rebalanceListener);
        this.client.updateMetadata(this.metadataResponse);
        this.client.prepareResponse(groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady(this.time.timer(Long.MAX_VALUE));
        Map<String, List<String>> singletonMap = Collections.singletonMap("consumer", Collections.singletonList("test1"));
        rackAwareAssignor.prepare(Collections.singletonMap("consumer", Collections.singletonList(this.t1p)));
        this.client.prepareResponse(joinGroupLeaderResponse(1, "consumer", singletonMap, false, Errors.NONE, Optional.of("rack-a")));
        this.client.prepareResponse(syncGroupResponse(Collections.singletonList(this.t1p), Errors.NONE));
        this.coordinator.poll(this.time.timer(Long.MAX_VALUE));
        Assertions.assertEquals(Collections.singleton(this.t1p), this.coordinator.subscriptionState().assignedPartitions());
        Assertions.assertEquals(Collections.singleton("rack-a"), rackAwareAssignor.rackIds);
    }

    @Test
    public void testThrowOnUnsupportedStableFlag() {
        supportStableFlag((short) 6, true);
    }

    @Test
    public void testNoThrowWhenStableFlagIsSupported() {
        supportStableFlag((short) 7, false);
    }

    private void supportStableFlag(short s, boolean z) {
        ConsumerCoordinator consumerCoordinator = new ConsumerCoordinator(this.rebalanceConfig, new LogContext(), this.consumerClient, this.assignors, this.metadata, this.subscriptions, new Metrics(this.time), "consumertest-group", this.time, false, 2000, (ConsumerInterceptors) null, true, (String) null);
        this.client.prepareResponse(groupCoordinatorResponse(this.node, Errors.NONE));
        this.client.setNodeApiVersions(NodeApiVersions.create(ApiKeys.OFFSET_FETCH.id, (short) 0, s));
        Optional of = Optional.of(15);
        OffsetFetchResponse.PartitionData partitionData = new OffsetFetchResponse.PartitionData(500L, of, "blahblah", Errors.NONE);
        if (s < 8) {
            this.client.prepareResponse(new OffsetFetchResponse(Errors.NONE, Collections.singletonMap(this.t1p, partitionData)));
        } else {
            this.client.prepareResponse(offsetFetchResponse(Errors.NONE, Collections.singletonMap(this.t1p, partitionData)));
        }
        if (z) {
            Assertions.assertThrows(UnsupportedVersionException.class, () -> {
                consumerCoordinator.fetchCommittedOffsets(Collections.singleton(this.t1p), this.time.timer(Long.MAX_VALUE));
            });
            return;
        }
        Map fetchCommittedOffsets = consumerCoordinator.fetchCommittedOffsets(Collections.singleton(this.t1p), this.time.timer(Long.MAX_VALUE));
        Assertions.assertNotNull(fetchCommittedOffsets);
        Assertions.assertEquals(new OffsetAndMetadata(500L, of, "blahblah"), fetchCommittedOffsets.get(this.t1p));
    }

    private void receiveFencedInstanceIdException() {
        this.subscriptions.assignFromUser(Collections.singleton(this.t1p));
        this.client.prepareResponse(groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady(this.time.timer(Long.MAX_VALUE));
        prepareOffsetCommitRequest(Collections.singletonMap(this.t1p, 100L), Errors.FENCED_INSTANCE_ID);
        this.coordinator.commitOffsetsAsync(Collections.singletonMap(this.t1p, new OffsetAndMetadata(100L)), new MockCommitCallback());
        this.coordinator.invokeCompletedOffsetCommitCallbacks();
    }

    private ConsumerCoordinator prepareCoordinatorForCloseTest(boolean z, boolean z2, Optional<String> optional, boolean z3) {
        this.rebalanceConfig = buildRebalanceConfig(optional);
        ConsumerCoordinator buildCoordinator = buildCoordinator(this.rebalanceConfig, new Metrics(), this.assignors, z2, this.subscriptions);
        this.client.prepareResponse(groupCoordinatorResponse(this.node, Errors.NONE));
        buildCoordinator.ensureCoordinatorReady(this.time.timer(Long.MAX_VALUE));
        if (z) {
            this.subscriptions.subscribe(Collections.singleton("test1"), this.rebalanceListener);
            this.client.prepareResponse(joinGroupFollowerResponse(1, "consumer", "leader", Errors.NONE));
            this.client.prepareResponse(syncGroupResponse(Collections.singletonList(this.t1p), Errors.NONE));
            buildCoordinator.joinGroupIfNeeded(this.time.timer(Long.MAX_VALUE));
        } else {
            this.subscriptions.assignFromUser(Collections.singleton(this.t1p));
        }
        this.subscriptions.seek(this.t1p, 100L);
        if (z3) {
            buildCoordinator.poll(this.time.timer(Long.MAX_VALUE));
        }
        return buildCoordinator;
    }

    private void makeCoordinatorUnknown(ConsumerCoordinator consumerCoordinator, Errors errors) {
        this.time.sleep(10000L);
        consumerCoordinator.sendHeartbeatRequest();
        this.client.prepareResponse(heartbeatResponse(errors));
        this.time.sleep(10000L);
        this.consumerClient.poll(this.time.timer(0L));
        Assertions.assertTrue(consumerCoordinator.coordinatorUnknown());
    }

    private void closeVerifyTimeout(ConsumerCoordinator consumerCoordinator, long j, long j2, long j3) throws Exception {
        ExecutorService newSingleThreadExecutor = Executors.newSingleThreadExecutor();
        try {
            boolean coordinatorUnknown = consumerCoordinator.coordinatorUnknown();
            Future<?> submit = newSingleThreadExecutor.submit(() -> {
                consumerCoordinator.close(this.time.timer(Math.min(j, 30000L)));
            });
            if (coordinatorUnknown) {
                Thread.sleep(200L);
            } else {
                this.client.waitForRequests(1, 1000L);
            }
            if (j2 > 0) {
                this.time.sleep(j2 - 1);
                try {
                    submit.get(500L, TimeUnit.MILLISECONDS);
                    Assertions.fail("Close completed ungracefully without waiting for timeout");
                } catch (TimeoutException e) {
                }
            }
            if (j3 >= 0) {
                this.time.sleep((j3 - j2) + 2);
            }
            submit.get(2000L, TimeUnit.MILLISECONDS);
            newSingleThreadExecutor.shutdownNow();
        } catch (Throwable th) {
            newSingleThreadExecutor.shutdownNow();
            throw th;
        }
    }

    private void gracefulCloseTest(ConsumerCoordinator consumerCoordinator, boolean z) {
        AtomicBoolean atomicBoolean = new AtomicBoolean();
        AtomicBoolean atomicBoolean2 = new AtomicBoolean();
        this.client.prepareResponse(abstractRequest -> {
            atomicBoolean.set(true);
            return ((OffsetCommitRequest) abstractRequest).data().groupId().equals("test-group");
        }, (AbstractResponse) new OffsetCommitResponse(new OffsetCommitResponseData()));
        if (z) {
            this.client.prepareResponse(abstractRequest2 -> {
                atomicBoolean2.set(true);
                return ((LeaveGroupRequest) abstractRequest2).data().groupId().equals("test-group");
            }, (AbstractResponse) new LeaveGroupResponse(new LeaveGroupResponseData().setErrorCode(Errors.NONE.code())));
        }
        this.client.prepareResponse(abstractRequest3 -> {
            atomicBoolean.set(true);
            return ((OffsetCommitRequest) abstractRequest3).data().groupId().equals("test-group");
        }, (AbstractResponse) new OffsetCommitResponse(new OffsetCommitResponseData()));
        consumerCoordinator.close();
        Assertions.assertTrue(atomicBoolean.get(), "Commit not requested");
        Assertions.assertEquals(Boolean.valueOf(z), Boolean.valueOf(atomicBoolean2.get()), "leaveGroupRequested should be " + z);
        if (z) {
            Assertions.assertEquals(1, this.rebalanceListener.revokedCount);
            Assertions.assertEquals(Collections.singleton(this.t1p), this.rebalanceListener.revoked);
        }
    }

    private ConsumerCoordinator buildCoordinator(GroupRebalanceConfig groupRebalanceConfig, Metrics metrics, List<ConsumerPartitionAssignor> list, boolean z, SubscriptionState subscriptionState) {
        return new ConsumerCoordinator(groupRebalanceConfig, new LogContext(), this.consumerClient, list, this.metadata, subscriptionState, metrics, "consumertest-group", this.time, z, 2000, (ConsumerInterceptors) null, false, (String) null);
    }

    private Collection<TopicPartition> getRevoked(List<TopicPartition> list, List<TopicPartition> list2) {
        switch (AnonymousClass17.$SwitchMap$org$apache$kafka$clients$consumer$ConsumerPartitionAssignor$RebalanceProtocol[this.protocol.ordinal()]) {
            case 1:
                return TestUtils.toSet(list);
            case 2:
                ArrayList arrayList = new ArrayList(list);
                arrayList.removeAll(list2);
                return TestUtils.toSet(arrayList);
            default:
                throw new IllegalStateException("This should not happen");
        }
    }

    private Collection<TopicPartition> getLost(List<TopicPartition> list) {
        switch (AnonymousClass17.$SwitchMap$org$apache$kafka$clients$consumer$ConsumerPartitionAssignor$RebalanceProtocol[this.protocol.ordinal()]) {
            case 1:
                return Collections.emptySet();
            case 2:
                return TestUtils.toSet(list);
            default:
                throw new IllegalStateException("This should not happen");
        }
    }

    private Collection<TopicPartition> getAdded(List<TopicPartition> list, List<TopicPartition> list2) {
        switch (AnonymousClass17.$SwitchMap$org$apache$kafka$clients$consumer$ConsumerPartitionAssignor$RebalanceProtocol[this.protocol.ordinal()]) {
            case 1:
                return TestUtils.toSet(list2);
            case 2:
                ArrayList arrayList = new ArrayList(list2);
                arrayList.removeAll(list);
                return TestUtils.toSet(arrayList);
            default:
                throw new IllegalStateException("This should not happen");
        }
    }

    private FindCoordinatorResponse groupCoordinatorResponse(Node node, Errors errors) {
        return FindCoordinatorResponse.prepareResponse(errors, "test-group", node);
    }

    private HeartbeatResponse heartbeatResponse(Errors errors) {
        return new HeartbeatResponse(new HeartbeatResponseData().setErrorCode(errors.code()));
    }

    private JoinGroupResponse joinGroupLeaderResponse(int i, String str, Map<String, List<String>> map, Errors errors) {
        return joinGroupLeaderResponse(i, str, map, false, errors, Optional.empty());
    }

    private JoinGroupResponse joinGroupLeaderResponse(int i, String str, Map<String, List<String>> map, boolean z, Errors errors, Optional<String> optional) {
        ArrayList arrayList = new ArrayList();
        for (Map.Entry<String, List<String>> entry : map.entrySet()) {
            arrayList.add(new JoinGroupResponseData.JoinGroupResponseMember().setMemberId(entry.getKey()).setMetadata(ConsumerProtocol.serializeSubscription(new ConsumerPartitionAssignor.Subscription(entry.getValue(), (ByteBuffer) null, Collections.emptyList(), -1, optional)).array()));
        }
        return new JoinGroupResponse(new JoinGroupResponseData().setErrorCode(errors.code()).setGenerationId(i).setProtocolName(this.partitionAssignor.name()).setLeader(str).setSkipAssignment(z).setMemberId(str).setMembers(arrayList), ApiKeys.JOIN_GROUP.latestVersion());
    }

    private JoinGroupResponse joinGroupFollowerResponse(int i, String str, String str2, Errors errors) {
        return new JoinGroupResponse(new JoinGroupResponseData().setErrorCode(errors.code()).setGenerationId(i).setProtocolName(this.partitionAssignor.name()).setLeader(str2).setMemberId(str).setMembers(Collections.emptyList()), ApiKeys.JOIN_GROUP.latestVersion());
    }

    private SyncGroupResponse syncGroupResponse(List<TopicPartition> list, Errors errors) {
        return new SyncGroupResponse(new SyncGroupResponseData().setErrorCode(errors.code()).setAssignment(Utils.toArray(ConsumerProtocol.serializeAssignment(new ConsumerPartitionAssignor.Assignment(list)))));
    }

    private OffsetCommitResponse offsetCommitResponse(Map<TopicPartition, Errors> map) {
        return new OffsetCommitResponse(map);
    }

    private OffsetFetchResponse offsetFetchResponse(Errors errors, Map<TopicPartition, OffsetFetchResponse.PartitionData> map) {
        return new OffsetFetchResponse(10, Collections.singletonMap("test-group", errors), Collections.singletonMap("test-group", map));
    }

    private OffsetFetchResponse offsetFetchResponse(TopicPartition topicPartition, Errors errors, String str, long j) {
        return offsetFetchResponse(topicPartition, errors, str, j, Optional.empty());
    }

    private OffsetFetchResponse offsetFetchResponse(TopicPartition topicPartition, Errors errors, String str, long j, Optional<Integer> optional) {
        return offsetFetchResponse(Errors.NONE, Collections.singletonMap(topicPartition, new OffsetFetchResponse.PartitionData(j, optional, str, errors)));
    }

    private OffsetCommitCallback callback(AtomicBoolean atomicBoolean) {
        return (map, exc) -> {
            if (exc == null) {
                atomicBoolean.set(true);
            }
        };
    }

    private void joinAsFollowerAndReceiveAssignment(ConsumerCoordinator consumerCoordinator, List<TopicPartition> list) {
        this.client.prepareResponse(groupCoordinatorResponse(this.node, Errors.NONE));
        consumerCoordinator.ensureCoordinatorReady(this.time.timer(Long.MAX_VALUE));
        this.client.prepareResponse(joinGroupFollowerResponse(1, "consumer", "leader", Errors.NONE));
        this.client.prepareResponse(syncGroupResponse(list, Errors.NONE));
        consumerCoordinator.joinGroupIfNeeded(this.time.timer(Long.MAX_VALUE));
    }

    private void prepareOffsetCommitRequest(Map<TopicPartition, Long> map, Errors errors) {
        prepareOffsetCommitRequest(map, errors, false);
    }

    private void prepareOffsetCommitRequestDisconnect(Map<TopicPartition, Long> map) {
        prepareOffsetCommitRequest(map, Errors.NONE, true);
    }

    private void prepareOffsetCommitRequest(Map<TopicPartition, Long> map, Errors errors, boolean z) {
        this.client.prepareResponse(offsetCommitRequestMatcher(map), offsetCommitResponse(partitionErrors(map.keySet(), errors)), z);
    }

    private void prepareJoinAndSyncResponse(String str, int i, List<String> list, List<TopicPartition> list2) {
        this.partitionAssignor.prepare(Collections.singletonMap(str, list2));
        this.client.prepareResponse(joinGroupLeaderResponse(i, str, Collections.singletonMap(str, list), Errors.NONE));
        this.client.prepareResponse(abstractRequest -> {
            SyncGroupRequest syncGroupRequest = (SyncGroupRequest) abstractRequest;
            return syncGroupRequest.data().memberId().equals(str) && syncGroupRequest.data().generationId() == i && syncGroupRequest.groupAssignments().containsKey(str);
        }, (AbstractResponse) syncGroupResponse(list2, Errors.NONE));
    }

    private Map<TopicPartition, Errors> partitionErrors(Collection<TopicPartition> collection, Errors errors) {
        HashMap hashMap = new HashMap();
        Iterator<TopicPartition> it = collection.iterator();
        while (it.hasNext()) {
            hashMap.put(it.next(), errors);
        }
        return hashMap;
    }

    private void respondToOffsetCommitRequest(Map<TopicPartition, Long> map, Errors errors) {
        this.client.respond(offsetCommitRequestMatcher(map), (AbstractResponse) offsetCommitResponse(partitionErrors(map.keySet(), errors)));
    }

    private MockClient.RequestMatcher offsetCommitRequestMatcher(Map<TopicPartition, Long> map) {
        return abstractRequest -> {
            Map offsets = ((OffsetCommitRequest) abstractRequest).offsets();
            if (offsets.size() != map.size()) {
                return false;
            }
            for (Map.Entry entry : map.entrySet()) {
                if (!offsets.containsKey(entry.getKey()) || !((Long) offsets.get(entry.getKey())).equals(entry.getValue())) {
                    return false;
                }
            }
            return true;
        };
    }

    private OffsetCommitCallback callback(Map<TopicPartition, OffsetAndMetadata> map, AtomicBoolean atomicBoolean) {
        return (map2, exc) -> {
            if (map.equals(map2) && exc == null) {
                atomicBoolean.set(true);
            }
        };
    }
}
