package org.apache.kafka.clients.consumer.internals;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.kafka.clients.ApiVersions;
import org.apache.kafka.clients.ClientDnsLookup;
import org.apache.kafka.clients.ClientUtils;
import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.MockClient;
import org.apache.kafka.clients.NodeApiVersions;
import org.apache.kafka.clients.consumer.LogTruncationException;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.clients.consumer.internals.SubscriptionState;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.IsolationLevel;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.internals.ClusterResourceListeners;
import org.apache.kafka.common.message.ListOffsetsRequestData;
import org.apache.kafka.common.message.ListOffsetsResponseData;
import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData;
import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.AbstractResponse;
import org.apache.kafka.common.requests.ListOffsetsRequest;
import org.apache.kafka.common.requests.ListOffsetsResponse;
import org.apache.kafka.common.requests.MetadataResponse;
import org.apache.kafka.common.requests.OffsetsForLeaderEpochRequest;
import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse;
import org.apache.kafka.common.requests.RequestTestUtils;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.utils.ImplicitLinkedHashCollection;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/kafka/clients/consumer/internals/OffsetFetcherTest.class */
public class OffsetFetcherTest {
    private SubscriptionState subscriptions;
    private ConsumerMetadata metadata;
    private MockClient client;
    private Metrics metrics;
    private ConsumerNetworkClient consumerClient;
    private OffsetFetcher offsetFetcher;
    private final String topicName = "test";
    private final Uuid topicId = Uuid.randomUuid();
    private final Map<String, Uuid> topicIds = new HashMap<String, Uuid>() { // from class: org.apache.kafka.clients.consumer.internals.OffsetFetcherTest.1
        {
            put("test", OffsetFetcherTest.this.topicId);
        }
    };
    private final TopicPartition tp0 = new TopicPartition("test", 0);
    private final TopicPartition tp1 = new TopicPartition("test", 1);
    private final TopicPartition tp2 = new TopicPartition("test", 2);
    private final TopicPartition tp3 = new TopicPartition("test", 3);
    private final int validLeaderEpoch = 0;
    private final MetadataResponse initialUpdateResponse = RequestTestUtils.metadataUpdateWithIds(1, (Map<String, Integer>) Collections.singletonMap("test", 4), this.topicIds);
    private final long retryBackoffMs = 100;
    private MockTime time = new MockTime(1);
    private final ApiVersions apiVersions = new ApiVersions();

    @BeforeEach
    public void setup() {
    }

    private void assignFromUser(Set<TopicPartition> set) {
        this.subscriptions.assignFromUser(set);
        this.client.updateMetadata(this.initialUpdateResponse);
        this.metadata.updateWithCurrentRequestVersion(RequestTestUtils.metadataUpdateWithIds("dummy", 1, Collections.emptyMap(), Collections.singletonMap("test", 4), topicPartition -> {
            return 0;
        }, this.topicIds), false, 0L);
    }

    @AfterEach
    public void teardown() throws Exception {
        if (this.metrics != null) {
            this.metrics.close();
        }
    }

    @Test
    public void testUpdateFetchPositionNoOpWithPositionSet() {
        buildFetcher();
        assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.seek(this.tp0, 5L);
        this.offsetFetcher.resetPositionsIfNeeded();
        Assertions.assertFalse(this.client.hasInFlightRequests());
        Assertions.assertTrue(this.subscriptions.isFetchable(this.tp0));
        Assertions.assertEquals(5L, this.subscriptions.position(this.tp0).offset);
    }

    @Test
    public void testUpdateFetchPositionResetToDefaultOffset() {
        buildFetcher();
        assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.requestOffsetReset(this.tp0);
        this.client.prepareResponse(listOffsetRequestMatcher(-2L, 0), (AbstractResponse) listOffsetResponse(Errors.NONE, 1L, 5L));
        this.offsetFetcher.resetPositionsIfNeeded();
        this.consumerClient.pollNoWakeup();
        Assertions.assertFalse(this.subscriptions.isOffsetResetNeeded(this.tp0));
        Assertions.assertTrue(this.subscriptions.isFetchable(this.tp0));
        Assertions.assertEquals(5L, this.subscriptions.position(this.tp0).offset);
    }

    @Test
    public void testUpdateFetchPositionResetToLatestOffset() {
        buildFetcher();
        assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.requestOffsetReset(this.tp0, OffsetResetStrategy.LATEST);
        this.client.updateMetadata(this.initialUpdateResponse);
        this.client.prepareResponse(listOffsetRequestMatcher(-1L), (AbstractResponse) listOffsetResponse(Errors.NONE, 1L, 5L));
        this.offsetFetcher.resetPositionsIfNeeded();
        this.consumerClient.pollNoWakeup();
        Assertions.assertFalse(this.subscriptions.isOffsetResetNeeded(this.tp0));
        Assertions.assertTrue(this.subscriptions.isFetchable(this.tp0));
        Assertions.assertEquals(5L, this.subscriptions.position(this.tp0).offset);
    }

    @Test
    public void testFetchOffsetErrors() {
        buildFetcher();
        assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.requestOffsetReset(this.tp0, OffsetResetStrategy.LATEST);
        this.client.prepareResponse(listOffsetRequestMatcher(-1L, 0), listOffsetResponse(Errors.OFFSET_NOT_AVAILABLE, 1L, 5L), false);
        this.offsetFetcher.resetPositionsIfNeeded();
        this.consumerClient.pollNoWakeup();
        Assertions.assertFalse(this.subscriptions.hasValidPosition(this.tp0));
        Assertions.assertTrue(this.subscriptions.isOffsetResetNeeded(this.tp0));
        Assertions.assertFalse(this.subscriptions.isFetchable(this.tp0));
        this.time.sleep(100L);
        this.client.prepareResponse(listOffsetRequestMatcher(-1L, 0), listOffsetResponse(Errors.LEADER_NOT_AVAILABLE, 1L, 5L), false);
        this.offsetFetcher.resetPositionsIfNeeded();
        this.consumerClient.pollNoWakeup();
        Assertions.assertFalse(this.subscriptions.hasValidPosition(this.tp0));
        Assertions.assertTrue(this.subscriptions.isOffsetResetNeeded(this.tp0));
        Assertions.assertFalse(this.subscriptions.isFetchable(this.tp0));
        this.time.sleep(100L);
        this.client.prepareResponse(listOffsetRequestMatcher(-1L), listOffsetResponse(Errors.NONE, 1L, 5L), false);
        this.offsetFetcher.resetPositionsIfNeeded();
        this.consumerClient.pollNoWakeup();
        Assertions.assertTrue(this.subscriptions.hasValidPosition(this.tp0));
        Assertions.assertFalse(this.subscriptions.isOffsetResetNeeded(this.tp0));
        Assertions.assertTrue(this.subscriptions.isFetchable(this.tp0));
        Assertions.assertEquals(this.subscriptions.position(this.tp0).offset, 5L);
    }

    @Test
    public void testListOffsetSendsReadUncommitted() {
        testListOffsetsSendsIsolationLevel(IsolationLevel.READ_UNCOMMITTED);
    }

    @Test
    public void testListOffsetSendsReadCommitted() {
        testListOffsetsSendsIsolationLevel(IsolationLevel.READ_COMMITTED);
    }

    private void testListOffsetsSendsIsolationLevel(IsolationLevel isolationLevel) {
        buildFetcher(isolationLevel);
        assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.requestOffsetReset(this.tp0, OffsetResetStrategy.LATEST);
        this.client.prepareResponse(abstractRequest -> {
            return ((ListOffsetsRequest) abstractRequest).isolationLevel() == isolationLevel;
        }, (AbstractResponse) listOffsetResponse(Errors.NONE, 1L, 5L));
        this.offsetFetcher.resetPositionsIfNeeded();
        this.consumerClient.pollNoWakeup();
        Assertions.assertFalse(this.subscriptions.isOffsetResetNeeded(this.tp0));
        Assertions.assertTrue(this.subscriptions.isFetchable(this.tp0));
        Assertions.assertEquals(5L, this.subscriptions.position(this.tp0).offset);
    }

    @Test
    public void testresetPositionsSkipsBlackedOutConnections() {
        buildFetcher();
        assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.requestOffsetReset(this.tp0, OffsetResetStrategy.EARLIEST);
        this.client.updateMetadata(this.initialUpdateResponse);
        this.client.backoff((Node) this.initialUpdateResponse.brokers().iterator().next(), 500L);
        this.offsetFetcher.resetPositionsIfNeeded();
        Assertions.assertEquals(0, this.consumerClient.pendingRequestCount());
        this.consumerClient.pollNoWakeup();
        Assertions.assertTrue(this.subscriptions.isOffsetResetNeeded(this.tp0));
        Assertions.assertEquals(OffsetResetStrategy.EARLIEST, this.subscriptions.resetStrategy(this.tp0));
        this.time.sleep(500L);
        this.client.prepareResponse(listOffsetRequestMatcher(-2L), (AbstractResponse) listOffsetResponse(Errors.NONE, 1L, 5L));
        this.offsetFetcher.resetPositionsIfNeeded();
        this.consumerClient.pollNoWakeup();
        Assertions.assertFalse(this.subscriptions.isOffsetResetNeeded(this.tp0));
        Assertions.assertTrue(this.subscriptions.isFetchable(this.tp0));
        Assertions.assertEquals(5L, this.subscriptions.position(this.tp0).offset);
    }

    @Test
    public void testUpdateFetchPositionResetToEarliestOffset() {
        buildFetcher();
        assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.requestOffsetReset(this.tp0, OffsetResetStrategy.EARLIEST);
        this.client.prepareResponse(listOffsetRequestMatcher(-2L, 0), (AbstractResponse) listOffsetResponse(Errors.NONE, 1L, 5L));
        this.offsetFetcher.resetPositionsIfNeeded();
        this.consumerClient.pollNoWakeup();
        Assertions.assertFalse(this.subscriptions.isOffsetResetNeeded(this.tp0));
        Assertions.assertTrue(this.subscriptions.isFetchable(this.tp0));
        Assertions.assertEquals(5L, this.subscriptions.position(this.tp0).offset);
    }

    @Test
    public void testresetPositionsMetadataRefresh() {
        buildFetcher();
        assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.requestOffsetReset(this.tp0, OffsetResetStrategy.LATEST);
        this.client.prepareResponse(listOffsetRequestMatcher(-1L, 0), listOffsetResponse(Errors.NOT_LEADER_OR_FOLLOWER, 1L, 5L), false);
        this.offsetFetcher.resetPositionsIfNeeded();
        this.consumerClient.pollNoWakeup();
        Assertions.assertFalse(this.subscriptions.hasValidPosition(this.tp0));
        this.client.prepareMetadataUpdate(this.initialUpdateResponse);
        this.consumerClient.pollNoWakeup();
        Assertions.assertFalse(this.client.hasPendingMetadataUpdates());
        this.time.sleep(100L);
        this.client.prepareResponse(listOffsetRequestMatcher(-1L), (AbstractResponse) listOffsetResponse(Errors.NONE, 1L, 5L));
        this.offsetFetcher.resetPositionsIfNeeded();
        this.consumerClient.pollNoWakeup();
        Assertions.assertFalse(this.subscriptions.isOffsetResetNeeded(this.tp0));
        Assertions.assertTrue(this.subscriptions.isFetchable(this.tp0));
        Assertions.assertEquals(5L, this.subscriptions.position(this.tp0).offset);
    }

    @Test
    public void testListOffsetNoUpdateMissingEpoch() {
        buildFetcher();
        this.subscriptions.assignFromUser(Collections.singleton(this.tp0));
        this.client.updateMetadata(RequestTestUtils.metadataUpdateWithIds("kafka-cluster", 1, Collections.emptyMap(), Collections.singletonMap("test", 4), topicPartition -> {
            return null;
        }, this.topicIds));
        this.subscriptions.requestOffsetReset(this.tp0, OffsetResetStrategy.LATEST);
        this.client.prepareResponse(listOffsetRequestMatcher(-1L), (AbstractResponse) listOffsetResponse(this.tp0, Errors.NONE, 1L, 5L, 1));
        this.offsetFetcher.resetPositionsIfNeeded();
        this.consumerClient.pollNoWakeup();
        Assertions.assertFalse(this.subscriptions.isOffsetResetNeeded(this.tp0));
        Assertions.assertFalse(this.metadata.updateRequested());
        Assertions.assertFalse(this.metadata.lastSeenLeaderEpoch(this.tp0).isPresent());
    }

    @Test
    public void testListOffsetUpdateEpoch() {
        buildFetcher();
        this.subscriptions.assignFromUser(Collections.singleton(this.tp0));
        this.client.updateMetadata(RequestTestUtils.metadataUpdateWithIds("kafka-cluster", 1, Collections.emptyMap(), Collections.singletonMap("test", 4), topicPartition -> {
            return 1;
        }, this.topicIds));
        this.subscriptions.requestOffsetReset(this.tp0, OffsetResetStrategy.LATEST);
        this.client.prepareResponse(listOffsetRequestMatcher(-1L, 1), (AbstractResponse) listOffsetResponse(this.tp0, Errors.NONE, 1L, 5L, 2));
        this.offsetFetcher.resetPositionsIfNeeded();
        this.consumerClient.pollNoWakeup();
        Assertions.assertFalse(this.subscriptions.isOffsetResetNeeded(this.tp0));
        Assertions.assertTrue(this.metadata.updateRequested());
        TestUtils.assertOptional(this.metadata.lastSeenLeaderEpoch(this.tp0), num -> {
            Assertions.assertEquals(num.intValue(), 2L);
        });
    }

    @Test
    public void testUpdateFetchPositionDisconnect() {
        buildFetcher();
        assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.requestOffsetReset(this.tp0, OffsetResetStrategy.LATEST);
        this.client.prepareResponse(listOffsetRequestMatcher(-1L, 0), listOffsetResponse(Errors.NONE, 1L, 5L), true);
        this.offsetFetcher.resetPositionsIfNeeded();
        this.consumerClient.pollNoWakeup();
        Assertions.assertFalse(this.subscriptions.hasValidPosition(this.tp0));
        this.client.prepareMetadataUpdate(this.initialUpdateResponse);
        this.consumerClient.pollNoWakeup();
        Assertions.assertFalse(this.client.hasPendingMetadataUpdates());
        this.offsetFetcher.resetPositionsIfNeeded();
        this.consumerClient.pollNoWakeup();
        Assertions.assertFalse(this.client.hasInFlightRequests());
        Assertions.assertFalse(this.subscriptions.hasValidPosition(this.tp0));
        this.time.sleep(100L);
        this.client.prepareResponse(listOffsetRequestMatcher(-1L), (AbstractResponse) listOffsetResponse(Errors.NONE, 1L, 5L));
        this.offsetFetcher.resetPositionsIfNeeded();
        this.consumerClient.pollNoWakeup();
        Assertions.assertFalse(this.subscriptions.isOffsetResetNeeded(this.tp0));
        Assertions.assertTrue(this.subscriptions.isFetchable(this.tp0));
        Assertions.assertEquals(5L, this.subscriptions.position(this.tp0).offset);
    }

    @Test
    public void testAssignmentChangeWithInFlightReset() {
        buildFetcher();
        assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.requestOffsetReset(this.tp0, OffsetResetStrategy.LATEST);
        this.offsetFetcher.resetPositionsIfNeeded();
        this.consumerClient.pollNoWakeup();
        Assertions.assertFalse(this.subscriptions.hasValidPosition(this.tp0));
        Assertions.assertTrue(this.client.hasInFlightRequests());
        assignFromUser(Collections.singleton(this.tp1));
        this.client.respond(listOffsetResponse(Errors.NONE, 1L, 5L));
        this.consumerClient.pollNoWakeup();
        Assertions.assertFalse(this.client.hasPendingResponses());
        Assertions.assertFalse(this.client.hasInFlightRequests());
        Assertions.assertFalse(this.subscriptions.isAssigned(this.tp0));
    }

    @Test
    public void testSeekWithInFlightReset() {
        buildFetcher();
        assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.requestOffsetReset(this.tp0, OffsetResetStrategy.LATEST);
        this.offsetFetcher.resetPositionsIfNeeded();
        this.consumerClient.pollNoWakeup();
        Assertions.assertFalse(this.subscriptions.hasValidPosition(this.tp0));
        Assertions.assertTrue(this.client.hasInFlightRequests());
        this.subscriptions.seek(this.tp0, 237L);
        this.client.respond(listOffsetResponse(Errors.NONE, 1L, 5L));
        this.consumerClient.pollNoWakeup();
        Assertions.assertFalse(this.client.hasPendingResponses());
        Assertions.assertFalse(this.client.hasInFlightRequests());
        Assertions.assertEquals(237L, this.subscriptions.position(this.tp0).offset);
    }

    private boolean listOffsetMatchesExpectedReset(TopicPartition topicPartition, OffsetResetStrategy offsetResetStrategy, AbstractRequest abstractRequest) {
        Assertions.assertTrue(abstractRequest instanceof ListOffsetsRequest);
        ListOffsetsRequest listOffsetsRequest = (ListOffsetsRequest) abstractRequest;
        Assertions.assertEquals(Collections.singleton(topicPartition.topic()), listOffsetsRequest.data().topics().stream().map((v0) -> {
            return v0.name();
        }).collect(Collectors.toSet()));
        ListOffsetsRequestData.ListOffsetsTopic listOffsetsTopic = (ListOffsetsRequestData.ListOffsetsTopic) listOffsetsRequest.data().topics().get(0);
        Assertions.assertEquals(Collections.singleton(Integer.valueOf(topicPartition.partition())), listOffsetsTopic.partitions().stream().map((v0) -> {
            return v0.partitionIndex();
        }).collect(Collectors.toSet()));
        ListOffsetsRequestData.ListOffsetsPartition listOffsetsPartition = (ListOffsetsRequestData.ListOffsetsPartition) listOffsetsTopic.partitions().get(0);
        if (offsetResetStrategy == OffsetResetStrategy.EARLIEST) {
            Assertions.assertEquals(-2L, listOffsetsPartition.timestamp());
            return true;
        }
        if (offsetResetStrategy != OffsetResetStrategy.LATEST) {
            return true;
        }
        Assertions.assertEquals(-1L, listOffsetsPartition.timestamp());
        return true;
    }

    @Test
    public void testEarlierOffsetResetArrivesLate() {
        buildFetcher();
        assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.requestOffsetReset(this.tp0, OffsetResetStrategy.EARLIEST);
        this.offsetFetcher.resetPositionsIfNeeded();
        this.client.prepareResponse(abstractRequest -> {
            if (!listOffsetMatchesExpectedReset(this.tp0, OffsetResetStrategy.EARLIEST, abstractRequest)) {
                return false;
            }
            this.subscriptions.requestOffsetReset(this.tp0, OffsetResetStrategy.LATEST);
            return true;
        }, (AbstractResponse) listOffsetResponse(Errors.NONE, 1L, 0L));
        this.consumerClient.pollNoWakeup();
        Assertions.assertTrue(this.subscriptions.isOffsetResetNeeded(this.tp0));
        Assertions.assertEquals(OffsetResetStrategy.LATEST, this.subscriptions.resetStrategy(this.tp0));
        this.offsetFetcher.resetPositionsIfNeeded();
        this.client.prepareResponse(abstractRequest2 -> {
            return listOffsetMatchesExpectedReset(this.tp0, OffsetResetStrategy.LATEST, abstractRequest2);
        }, (AbstractResponse) listOffsetResponse(Errors.NONE, 1L, 10L));
        this.consumerClient.pollNoWakeup();
        Assertions.assertFalse(this.subscriptions.isOffsetResetNeeded(this.tp0));
        Assertions.assertEquals(10L, this.subscriptions.position(this.tp0).offset);
    }

    @Test
    public void testChangeResetWithInFlightReset() {
        buildFetcher();
        assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.requestOffsetReset(this.tp0, OffsetResetStrategy.LATEST);
        this.offsetFetcher.resetPositionsIfNeeded();
        this.consumerClient.pollNoWakeup();
        Assertions.assertFalse(this.subscriptions.hasValidPosition(this.tp0));
        Assertions.assertTrue(this.client.hasInFlightRequests());
        this.subscriptions.requestOffsetReset(this.tp0, OffsetResetStrategy.EARLIEST);
        this.client.respond(listOffsetResponse(Errors.NONE, 1L, 5L));
        this.consumerClient.pollNoWakeup();
        Assertions.assertFalse(this.client.hasPendingResponses());
        Assertions.assertFalse(this.client.hasInFlightRequests());
        Assertions.assertTrue(this.subscriptions.isOffsetResetNeeded(this.tp0));
        Assertions.assertEquals(OffsetResetStrategy.EARLIEST, this.subscriptions.resetStrategy(this.tp0));
    }

    @Test
    public void testIdempotentResetWithInFlightReset() {
        buildFetcher();
        assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.requestOffsetReset(this.tp0, OffsetResetStrategy.LATEST);
        this.offsetFetcher.resetPositionsIfNeeded();
        this.consumerClient.pollNoWakeup();
        Assertions.assertFalse(this.subscriptions.hasValidPosition(this.tp0));
        Assertions.assertTrue(this.client.hasInFlightRequests());
        this.subscriptions.requestOffsetReset(this.tp0, OffsetResetStrategy.LATEST);
        this.client.respond(listOffsetResponse(Errors.NONE, 1L, 5L));
        this.consumerClient.pollNoWakeup();
        Assertions.assertFalse(this.client.hasInFlightRequests());
        Assertions.assertFalse(this.subscriptions.isOffsetResetNeeded(this.tp0));
        Assertions.assertEquals(5L, this.subscriptions.position(this.tp0).offset);
    }

    @Test
    public void testRestOffsetsAuthorizationFailure() {
        buildFetcher();
        assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.requestOffsetReset(this.tp0, OffsetResetStrategy.LATEST);
        this.client.prepareResponse(listOffsetRequestMatcher(-1L, 0), listOffsetResponse(Errors.TOPIC_AUTHORIZATION_FAILED, -1L, -1L), false);
        this.offsetFetcher.resetPositionsIfNeeded();
        this.consumerClient.pollNoWakeup();
        Assertions.assertFalse(this.subscriptions.hasValidPosition(this.tp0));
        try {
            this.offsetFetcher.resetPositionsIfNeeded();
            Assertions.fail("Expected authorization error to be raised");
        } catch (TopicAuthorizationException e) {
            Assertions.assertEquals(Collections.singleton(this.tp0.topic()), e.unauthorizedTopics());
        }
        this.offsetFetcher.resetPositionsIfNeeded();
        this.consumerClient.pollNoWakeup();
        Assertions.assertFalse(this.client.hasInFlightRequests());
        Assertions.assertFalse(this.subscriptions.hasValidPosition(this.tp0));
        this.time.sleep(100L);
        this.client.prepareResponse(listOffsetRequestMatcher(-1L), (AbstractResponse) listOffsetResponse(Errors.NONE, 1L, 5L));
        this.offsetFetcher.resetPositionsIfNeeded();
        this.consumerClient.pollNoWakeup();
        Assertions.assertFalse(this.subscriptions.isOffsetResetNeeded(this.tp0));
        Assertions.assertTrue(this.subscriptions.isFetchable(this.tp0));
        Assertions.assertEquals(5L, this.subscriptions.position(this.tp0).offset);
    }

    @Test
    public void testFetchingPendingPartitionsBeforeAndAfterSubscriptionReset() {
        buildFetcher();
        assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.seek(this.tp0, 100L);
        Assertions.assertEquals(100L, this.subscriptions.position(this.tp0).offset);
        Assertions.assertTrue(this.subscriptions.isFetchable(this.tp0));
        this.subscriptions.markPendingRevocation(Collections.singleton(this.tp0));
        this.offsetFetcher.resetPositionsIfNeeded();
        Assertions.assertFalse(this.subscriptions.isOffsetResetNeeded(this.tp0));
        Assertions.assertFalse(this.subscriptions.isFetchable(this.tp0));
        Assertions.assertTrue(this.subscriptions.hasValidPosition(this.tp0));
        Assertions.assertEquals(100L, this.subscriptions.position(this.tp0).offset);
        this.subscriptions.seek(this.tp0, 100L);
        Assertions.assertEquals(100L, this.subscriptions.position(this.tp0).offset);
        this.subscriptions.unsubscribe();
        assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.seek(this.tp0, 100L);
        Assertions.assertEquals(100L, this.subscriptions.position(this.tp0).offset);
        Assertions.assertTrue(this.subscriptions.isFetchable(this.tp0));
    }

    @Test
    public void testUpdateFetchPositionOfPausedPartitionsRequiringOffsetReset() {
        buildFetcher();
        assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.pause(this.tp0);
        this.subscriptions.requestOffsetReset(this.tp0, OffsetResetStrategy.LATEST);
        this.client.prepareResponse(listOffsetRequestMatcher(-1L, 0), (AbstractResponse) listOffsetResponse(Errors.NONE, 1L, 10L));
        this.offsetFetcher.resetPositionsIfNeeded();
        this.consumerClient.pollNoWakeup();
        Assertions.assertFalse(this.subscriptions.isOffsetResetNeeded(this.tp0));
        Assertions.assertFalse(this.subscriptions.isFetchable(this.tp0));
        Assertions.assertTrue(this.subscriptions.hasValidPosition(this.tp0));
        Assertions.assertEquals(10L, this.subscriptions.position(this.tp0).offset);
    }

    @Test
    public void testUpdateFetchPositionOfPausedPartitionsWithoutAValidPosition() {
        buildFetcher();
        assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.requestOffsetReset(this.tp0);
        this.subscriptions.pause(this.tp0);
        this.offsetFetcher.resetPositionsIfNeeded();
        this.consumerClient.pollNoWakeup();
        Assertions.assertTrue(this.subscriptions.isOffsetResetNeeded(this.tp0));
        Assertions.assertFalse(this.subscriptions.isFetchable(this.tp0));
        Assertions.assertFalse(this.subscriptions.hasValidPosition(this.tp0));
    }

    @Test
    public void testUpdateFetchPositionOfPausedPartitionsWithAValidPosition() {
        buildFetcher();
        assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.seek(this.tp0, 10L);
        this.subscriptions.pause(this.tp0);
        this.offsetFetcher.resetPositionsIfNeeded();
        Assertions.assertFalse(this.subscriptions.isOffsetResetNeeded(this.tp0));
        Assertions.assertFalse(this.subscriptions.isFetchable(this.tp0));
        Assertions.assertTrue(this.subscriptions.hasValidPosition(this.tp0));
        Assertions.assertEquals(10L, this.subscriptions.position(this.tp0).offset);
    }

    @Test
    public void testGetOffsetsForTimesTimeout() {
        buildFetcher();
        Assertions.assertThrows(TimeoutException.class, () -> {
            this.offsetFetcher.offsetsForTimes(Collections.singletonMap(new TopicPartition("test", 2), 1000L), this.time.timer(100L));
        });
    }

    @Test
    public void testGetOffsetsForTimes() {
        buildFetcher();
        Assertions.assertTrue(this.offsetFetcher.offsetsForTimes(new HashMap(), this.time.timer(100L)).isEmpty());
        testGetOffsetsForTimesWithUnknownOffset();
        testGetOffsetsForTimesWithError(Errors.NONE, Errors.NONE, -1L, null);
        testGetOffsetsForTimesWithError(Errors.NONE, Errors.NONE, 10L, 10L);
        testGetOffsetsForTimesWithError(Errors.NOT_LEADER_OR_FOLLOWER, Errors.INVALID_REQUEST, 10L, 10L);
        testGetOffsetsForTimesWithError(Errors.NONE, Errors.NOT_LEADER_OR_FOLLOWER, 10L, 10L);
        testGetOffsetsForTimesWithError(Errors.NOT_LEADER_OR_FOLLOWER, Errors.NONE, 10L, 10L);
        testGetOffsetsForTimesWithError(Errors.UNKNOWN_TOPIC_OR_PARTITION, Errors.NONE, 10L, 10L);
        testGetOffsetsForTimesWithError(Errors.UNSUPPORTED_FOR_MESSAGE_FORMAT, Errors.NONE, 10L, null);
        testGetOffsetsForTimesWithError(Errors.BROKER_NOT_AVAILABLE, Errors.NONE, 10L, 10L);
    }

    @Test
    public void testGetOffsetsFencedLeaderEpoch() {
        buildFetcher();
        this.subscriptions.assignFromUser(Collections.singleton(this.tp0));
        this.client.updateMetadata(this.initialUpdateResponse);
        this.subscriptions.requestOffsetReset(this.tp0, OffsetResetStrategy.LATEST);
        this.client.prepareResponse(listOffsetResponse(Errors.FENCED_LEADER_EPOCH, 1L, 5L));
        this.offsetFetcher.resetPositionsIfNeeded();
        this.consumerClient.pollNoWakeup();
        Assertions.assertTrue(this.subscriptions.isOffsetResetNeeded(this.tp0));
        Assertions.assertFalse(this.subscriptions.isFetchable(this.tp0));
        Assertions.assertFalse(this.subscriptions.hasValidPosition(this.tp0));
        Assertions.assertEquals(0L, this.metadata.timeToNextUpdate(this.time.milliseconds()));
    }

    @Test
    public void testGetOffsetByTimeWithPartitionsRetryCouldTriggerMetadataUpdate() {
        List<Errors> asList = Arrays.asList(Errors.NOT_LEADER_OR_FOLLOWER, Errors.REPLICA_NOT_AVAILABLE, Errors.KAFKA_STORAGE_ERROR, Errors.OFFSET_NOT_AVAILABLE, Errors.LEADER_NOT_AVAILABLE, Errors.FENCED_LEADER_EPOCH, Errors.UNKNOWN_LEADER_EPOCH);
        MetadataResponse metadataUpdateWithIds = RequestTestUtils.metadataUpdateWithIds("dummy", 3, Collections.singletonMap("test", Errors.NONE), Collections.singletonMap("test", 4), topicPartition -> {
            return 3;
        }, this.topicIds);
        Node leaderFor = this.initialUpdateResponse.buildCluster().leaderFor(this.tp1);
        Node leaderFor2 = metadataUpdateWithIds.buildCluster().leaderFor(this.tp1);
        Assertions.assertNotEquals(leaderFor, leaderFor2);
        for (Errors errors : asList) {
            buildFetcher();
            this.subscriptions.assignFromUser(Utils.mkSet(new TopicPartition[]{this.tp0, this.tp1}));
            this.client.updateMetadata(this.initialUpdateResponse);
            ListOffsetsResponseData.ListOffsetsPartitionResponse offset = new ListOffsetsResponseData.ListOffsetsPartitionResponse().setPartitionIndex(this.tp0.partition()).setErrorCode(Errors.NONE.code()).setTimestamp(10L).setOffset(4L);
            this.client.prepareResponseFrom(abstractRequest -> {
                if (abstractRequest instanceof ListOffsetsRequest) {
                    return ((ListOffsetsRequest) abstractRequest).topics().equals(Collections.singletonList(new ListOffsetsRequestData.ListOffsetsTopic().setName(this.tp0.topic()).setPartitions(Arrays.asList(new ListOffsetsRequestData.ListOffsetsPartition().setPartitionIndex(this.tp1.partition()).setTimestamp(10L).setCurrentLeaderEpoch(-1), new ListOffsetsRequestData.ListOffsetsPartition().setPartitionIndex(this.tp0.partition()).setTimestamp(10L).setCurrentLeaderEpoch(-1)))));
                }
                return false;
            }, (AbstractResponse) new ListOffsetsResponse(new ListOffsetsResponseData().setThrottleTimeMs(0).setTopics(Collections.singletonList(new ListOffsetsResponseData.ListOffsetsTopicResponse().setName(this.tp0.topic()).setPartitions(Arrays.asList(offset, new ListOffsetsResponseData.ListOffsetsPartitionResponse().setPartitionIndex(this.tp1.partition()).setErrorCode(errors.code()).setTimestamp(-1L).setOffset(-1L)))))), leaderFor);
            this.client.prepareMetadataUpdate(metadataUpdateWithIds);
            this.client.prepareResponseFrom(new ListOffsetsResponse(new ListOffsetsResponseData().setThrottleTimeMs(0).setTopics(Collections.singletonList(new ListOffsetsResponseData.ListOffsetsTopicResponse().setName(this.tp0.topic()).setPartitions(Arrays.asList(offset, new ListOffsetsResponseData.ListOffsetsPartitionResponse().setPartitionIndex(this.tp1.partition()).setErrorCode(Errors.NOT_LEADER_OR_FOLLOWER.code()).setTimestamp(-1L).setOffset(-1L)))))), leaderFor);
            this.client.prepareResponseFrom(abstractRequest2 -> {
                if (!(abstractRequest2 instanceof ListOffsetsRequest)) {
                    return false;
                }
                return new ListOffsetsRequestData.ListOffsetsPartition().setPartitionIndex(this.tp1.partition()).setTimestamp(10L).setCurrentLeaderEpoch(3).equals(((ListOffsetsRequestData.ListOffsetsTopic) ((ListOffsetsRequest) abstractRequest2).topics().get(0)).partitions().get(0));
            }, (AbstractResponse) listOffsetResponse(this.tp1, Errors.NONE, 10L, 5L), leaderFor2);
            Assertions.assertEquals(Utils.mkMap(new Map.Entry[]{Utils.mkEntry(this.tp0, new OffsetAndTimestamp(4L, 10L)), Utils.mkEntry(this.tp1, new OffsetAndTimestamp(5L, 10L))}), this.offsetFetcher.offsetsForTimes(Utils.mkMap(new Map.Entry[]{Utils.mkEntry(this.tp0, 10L), Utils.mkEntry(this.tp1, 10L)}), this.time.timer(2147483647L)));
            Assertions.assertEquals(1, this.client.numAwaitingResponses());
        }
    }

    @Test
    public void testGetOffsetsUnknownLeaderEpoch() {
        buildFetcher();
        this.subscriptions.assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.requestOffsetReset(this.tp0, OffsetResetStrategy.LATEST);
        this.client.prepareResponse(listOffsetResponse(Errors.UNKNOWN_LEADER_EPOCH, 1L, 5L));
        this.offsetFetcher.resetPositionsIfNeeded();
        this.consumerClient.pollNoWakeup();
        Assertions.assertTrue(this.subscriptions.isOffsetResetNeeded(this.tp0));
        Assertions.assertFalse(this.subscriptions.isFetchable(this.tp0));
        Assertions.assertFalse(this.subscriptions.hasValidPosition(this.tp0));
        Assertions.assertEquals(0L, this.metadata.timeToNextUpdate(this.time.milliseconds()));
    }

    @Test
    public void testGetOffsetsIncludesLeaderEpoch() {
        buildFetcher();
        this.subscriptions.assignFromUser(Collections.singleton(this.tp0));
        this.client.updateMetadata(this.initialUpdateResponse);
        this.client.updateMetadata(RequestTestUtils.metadataUpdateWithIds("dummy", 1, Collections.emptyMap(), Collections.singletonMap("test", 4), topicPartition -> {
            return 99;
        }, this.topicIds));
        this.subscriptions.requestOffsetReset(this.tp0);
        this.offsetFetcher.resetPositionsIfNeeded();
        this.client.prepareResponse(abstractRequest -> {
            if (!(abstractRequest instanceof ListOffsetsRequest)) {
                Assertions.fail("Should have seen ListOffsetRequest");
                return false;
            }
            int currentLeaderEpoch = ((ListOffsetsRequestData.ListOffsetsPartition) ((ListOffsetsRequestData.ListOffsetsTopic) ((ListOffsetsRequest) abstractRequest).topics().get(0)).partitions().get(0)).currentLeaderEpoch();
            Assertions.assertTrue(currentLeaderEpoch != -1, "Expected Fetcher to set leader epoch in request");
            Assertions.assertEquals(currentLeaderEpoch, 99, "Expected leader epoch to match epoch from metadata update");
            return true;
        }, (AbstractResponse) listOffsetResponse(Errors.NONE, 1L, 5L));
        this.consumerClient.pollNoWakeup();
    }

    @Test
    public void testGetOffsetsForTimesWhenSomeTopicPartitionLeadersNotKnownInitially() {
        buildFetcher();
        this.subscriptions.assignFromUser(Utils.mkSet(new TopicPartition[]{this.tp0, this.tp1}));
        TopicPartition topicPartition = new TopicPartition("another-topic", 0);
        this.client.reset();
        MetadataResponse metadataUpdateWithIds = RequestTestUtils.metadataUpdateWithIds(3, (Map<String, Integer>) Collections.singletonMap("test", 2), this.topicIds);
        this.client.updateMetadata(metadataUpdateWithIds);
        this.client.prepareMetadataUpdate(metadataUpdateWithIds);
        this.client.prepareResponseFrom(listOffsetResponse(this.tp0, Errors.NONE, 1000L, 11L), this.metadata.fetch().leaderFor(this.tp0));
        this.client.prepareResponseFrom(listOffsetResponse(this.tp1, Errors.NONE, 1000L, 32L), this.metadata.fetch().leaderFor(this.tp1));
        HashMap hashMap = new HashMap();
        hashMap.put("test", 2);
        hashMap.put("another-topic", 1);
        this.topicIds.put("another-topic", Uuid.randomUuid());
        this.client.prepareMetadataUpdate(RequestTestUtils.metadataUpdateWithIds(3, hashMap, this.topicIds));
        this.client.prepareResponseFrom(listOffsetResponse(topicPartition, Errors.NONE, 1000L, 54L), this.metadata.fetch().leaderFor(topicPartition));
        HashMap hashMap2 = new HashMap();
        hashMap2.put(this.tp0, -1L);
        hashMap2.put(this.tp1, -1L);
        hashMap2.put(topicPartition, -1L);
        Map offsetsForTimes = this.offsetFetcher.offsetsForTimes(hashMap2, this.time.timer(Long.MAX_VALUE));
        Assertions.assertNotNull(offsetsForTimes.get(this.tp0), "Expect MetadataFetcher.offsetsForTimes() to return non-null result for " + this.tp0);
        Assertions.assertNotNull(offsetsForTimes.get(this.tp1), "Expect MetadataFetcher.offsetsForTimes() to return non-null result for " + this.tp1);
        Assertions.assertNotNull(offsetsForTimes.get(topicPartition), "Expect MetadataFetcher.offsetsForTimes() to return non-null result for " + topicPartition);
        Assertions.assertEquals(11L, ((OffsetAndTimestamp) offsetsForTimes.get(this.tp0)).offset());
        Assertions.assertEquals(32L, ((OffsetAndTimestamp) offsetsForTimes.get(this.tp1)).offset());
        Assertions.assertEquals(54L, ((OffsetAndTimestamp) offsetsForTimes.get(topicPartition)).offset());
    }

    @Test
    public void testGetOffsetsForTimesWhenSomeTopicPartitionLeadersDisconnectException() {
        buildFetcher();
        this.subscriptions.assignFromUser(Utils.mkSet(new TopicPartition[]{this.tp0, new TopicPartition("another-topic", 0)}));
        this.client.reset();
        this.client.updateMetadata(RequestTestUtils.metadataUpdateWithIds(1, (Map<String, Integer>) Collections.singletonMap("test", 1), this.topicIds));
        HashMap hashMap = new HashMap();
        hashMap.put("test", 1);
        hashMap.put("another-topic", 1);
        this.topicIds.put("another-topic", Uuid.randomUuid());
        this.client.prepareMetadataUpdate(RequestTestUtils.metadataUpdateWithIds(1, hashMap, this.topicIds));
        this.client.prepareResponse(listOffsetRequestMatcher(-1L), listOffsetResponse(this.tp0, Errors.NONE, 1000L, 11L), true);
        this.client.prepareResponseFrom(listOffsetResponse(this.tp0, Errors.NONE, 1000L, 11L), this.metadata.fetch().leaderFor(this.tp0));
        HashMap hashMap2 = new HashMap();
        hashMap2.put(this.tp0, -1L);
        Map offsetsForTimes = this.offsetFetcher.offsetsForTimes(hashMap2, this.time.timer(Long.MAX_VALUE));
        Assertions.assertNotNull(offsetsForTimes.get(this.tp0), "Expect MetadataFetcher.offsetsForTimes() to return non-null result for " + this.tp0);
        Assertions.assertEquals(11L, ((OffsetAndTimestamp) offsetsForTimes.get(this.tp0)).offset());
        Assertions.assertNotNull(this.metadata.fetch().partitionCountForTopic("another-topic"));
    }

    @Test
    public void testListOffsetsWithZeroTimeout() {
        buildFetcher();
        HashMap hashMap = new HashMap();
        hashMap.put(this.tp0, -2L);
        hashMap.put(this.tp1, -2L);
        HashMap hashMap2 = new HashMap();
        hashMap2.put(this.tp0, null);
        hashMap2.put(this.tp1, null);
        Assertions.assertEquals(hashMap2, this.offsetFetcher.offsetsForTimes(hashMap, this.time.timer(0L)));
    }

    @Test
    public void testBatchedListOffsetsMetadataErrors() {
        buildFetcher();
        this.client.prepareResponse(new ListOffsetsResponse(new ListOffsetsResponseData().setThrottleTimeMs(0).setTopics(Collections.singletonList(new ListOffsetsResponseData.ListOffsetsTopicResponse().setName(this.tp0.topic()).setPartitions(Arrays.asList(new ListOffsetsResponseData.ListOffsetsPartitionResponse().setPartitionIndex(this.tp0.partition()).setErrorCode(Errors.NOT_LEADER_OR_FOLLOWER.code()).setTimestamp(-1L).setOffset(-1L), new ListOffsetsResponseData.ListOffsetsPartitionResponse().setPartitionIndex(this.tp1.partition()).setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code()).setTimestamp(-1L).setOffset(-1L)))))));
        HashMap hashMap = new HashMap();
        hashMap.put(this.tp0, -2L);
        hashMap.put(this.tp1, -2L);
        Assertions.assertThrows(TimeoutException.class, () -> {
            this.offsetFetcher.offsetsForTimes(hashMap, this.time.timer(1L));
        });
    }

    private void testGetOffsetsForTimesWithError(Errors errors, Errors errors2, long j, Long l) {
        this.client.reset();
        TopicPartition topicPartition = new TopicPartition("topic2", 0);
        this.metadata.bootstrap(ClientUtils.parseAndValidateAddresses(Collections.singletonList("1.1.1.1:1111"), ClientDnsLookup.USE_ALL_DNS_IPS));
        HashMap hashMap = new HashMap();
        hashMap.put("test", 2);
        hashMap.put("topic2", 1);
        MetadataResponse metadataUpdateWithIds = RequestTestUtils.metadataUpdateWithIds(2, hashMap, this.topicIds);
        Cluster buildCluster = metadataUpdateWithIds.buildCluster();
        this.client.prepareMetadataUpdate(metadataUpdateWithIds, true);
        this.client.prepareResponseFrom(listOffsetResponse(topicPartition, errors, j, j), buildCluster.leaderFor(topicPartition));
        this.client.prepareResponseFrom(listOffsetResponse(this.tp1, errors2, 100L, 100L), buildCluster.leaderFor(this.tp1));
        this.client.prepareResponseFrom(listOffsetResponse(topicPartition, Errors.NONE, j, j), buildCluster.leaderFor(topicPartition));
        this.client.prepareResponseFrom(listOffsetResponse(this.tp1, Errors.NONE, 100L, 100L), buildCluster.leaderFor(this.tp1));
        HashMap hashMap2 = new HashMap();
        hashMap2.put(topicPartition, 0L);
        hashMap2.put(this.tp1, 0L);
        Map offsetsForTimes = this.offsetFetcher.offsetsForTimes(hashMap2, this.time.timer(Long.MAX_VALUE));
        if (l == null) {
            Assertions.assertNull(offsetsForTimes.get(topicPartition));
        } else {
            Assertions.assertEquals(l.longValue(), ((OffsetAndTimestamp) offsetsForTimes.get(topicPartition)).timestamp());
            Assertions.assertEquals(l.longValue(), ((OffsetAndTimestamp) offsetsForTimes.get(topicPartition)).offset());
        }
        Assertions.assertEquals(100L, ((OffsetAndTimestamp) offsetsForTimes.get(this.tp1)).timestamp());
        Assertions.assertEquals(100L, ((OffsetAndTimestamp) offsetsForTimes.get(this.tp1)).offset());
    }

    private void testGetOffsetsForTimesWithUnknownOffset() {
        this.client.reset();
        this.client.updateMetadata(RequestTestUtils.metadataUpdateWithIds(1, (Map<String, Integer>) Collections.singletonMap("test", 1), this.topicIds));
        this.client.prepareResponseFrom(new ListOffsetsResponse(new ListOffsetsResponseData().setThrottleTimeMs(0).setTopics(Collections.singletonList(new ListOffsetsResponseData.ListOffsetsTopicResponse().setName(this.tp0.topic()).setPartitions(Collections.singletonList(new ListOffsetsResponseData.ListOffsetsPartitionResponse().setPartitionIndex(this.tp0.partition()).setErrorCode(Errors.NONE.code()).setTimestamp(-1L).setOffset(-1L)))))), this.metadata.fetch().leaderFor(this.tp0));
        HashMap hashMap = new HashMap();
        hashMap.put(this.tp0, 0L);
        Map offsetsForTimes = this.offsetFetcher.offsetsForTimes(hashMap, this.time.timer(Long.MAX_VALUE));
        Assertions.assertTrue(offsetsForTimes.containsKey(this.tp0));
        Assertions.assertNull(offsetsForTimes.get(this.tp0));
    }

    @Test
    public void testGetOffsetsForTimesWithUnknownOffsetV0() {
        buildFetcher();
        Assertions.assertTrue(this.offsetFetcher.offsetsForTimes(new HashMap(), this.time.timer(100L)).isEmpty());
        this.client.reset();
        this.client.updateMetadata(RequestTestUtils.metadataUpdateWithIds(1, (Map<String, Integer>) Collections.singletonMap("test", 1), this.topicIds));
        this.apiVersions.update(((Node) this.metadata.fetch().nodes().get(0)).idString(), NodeApiVersions.create(ApiKeys.LIST_OFFSETS.id, (short) 0, (short) 0));
        this.client.prepareResponseFrom(new ListOffsetsResponse(new ListOffsetsResponseData().setThrottleTimeMs(0).setTopics(Collections.singletonList(new ListOffsetsResponseData.ListOffsetsTopicResponse().setName(this.tp0.topic()).setPartitions(Collections.singletonList(new ListOffsetsResponseData.ListOffsetsPartitionResponse().setPartitionIndex(this.tp0.partition()).setErrorCode(Errors.NONE.code()).setTimestamp(-1L).setOldStyleOffsets(Collections.emptyList())))))), this.metadata.fetch().leaderFor(this.tp0));
        HashMap hashMap = new HashMap();
        hashMap.put(this.tp0, 0L);
        Map offsetsForTimes = this.offsetFetcher.offsetsForTimes(hashMap, this.time.timer(Long.MAX_VALUE));
        Assertions.assertTrue(offsetsForTimes.containsKey(this.tp0));
        Assertions.assertNull(offsetsForTimes.get(this.tp0));
    }

    @Test
    public void testOffsetValidationRequestGrouping() {
        buildFetcher();
        assignFromUser(Utils.mkSet(new TopicPartition[]{this.tp0, this.tp1, this.tp2, this.tp3}));
        this.metadata.updateWithCurrentRequestVersion(RequestTestUtils.metadataUpdateWithIds("dummy", 3, Collections.emptyMap(), Collections.singletonMap("test", 4), topicPartition -> {
            return 5;
        }, this.topicIds), false, 0L);
        for (TopicPartition topicPartition2 : this.subscriptions.assignedPartitions()) {
            this.subscriptions.seekUnvalidated(topicPartition2, new SubscriptionState.FetchPosition(0L, Optional.of(4), new Metadata.LeaderAndEpoch(this.metadata.currentLeader(topicPartition2).leader, Optional.of(4))));
        }
        HashSet hashSet = new HashSet();
        for (Node node : this.metadata.fetch().nodes()) {
            this.apiVersions.update(node.idString(), NodeApiVersions.create());
            Set set = (Set) this.subscriptions.assignedPartitions().stream().filter(topicPartition3 -> {
                return this.metadata.currentLeader(topicPartition3).leader.equals(Optional.of(node));
            }).collect(Collectors.toSet());
            Stream stream = set.stream();
            Objects.requireNonNull(hashSet);
            Assertions.assertTrue(stream.noneMatch((v1) -> {
                return r1.contains(v1);
            }));
            Assertions.assertTrue(set.size() > 0);
            hashSet.addAll(set);
            OffsetForLeaderEpochResponseData offsetForLeaderEpochResponseData = new OffsetForLeaderEpochResponseData();
            set.forEach(topicPartition4 -> {
                ImplicitLinkedHashCollection.Element find = offsetForLeaderEpochResponseData.topics().find(topicPartition4.topic());
                if (find == null) {
                    find = new OffsetForLeaderEpochResponseData.OffsetForLeaderTopicResult().setTopic(topicPartition4.topic());
                    offsetForLeaderEpochResponseData.topics().add(find);
                }
                find.partitions().add(new OffsetForLeaderEpochResponseData.EpochEndOffset().setPartition(topicPartition4.partition()).setErrorCode(Errors.NONE.code()).setLeaderEpoch(4).setEndOffset(0L));
            });
            this.client.prepareResponseFrom(abstractRequest -> {
                return set.equals(offsetForLeaderPartitionMap(((OffsetsForLeaderEpochRequest) abstractRequest).data()).keySet());
            }, new OffsetsForLeaderEpochResponse(offsetForLeaderEpochResponseData), node);
        }
        Assertions.assertEquals(this.subscriptions.assignedPartitions(), hashSet);
        this.offsetFetcher.validatePositionsIfNeeded();
        this.consumerClient.pollNoWakeup();
        Stream stream2 = this.subscriptions.assignedPartitions().stream();
        SubscriptionState subscriptionState = this.subscriptions;
        Objects.requireNonNull(subscriptionState);
        Assertions.assertTrue(stream2.noneMatch(subscriptionState::awaitingValidation));
    }

    @Test
    public void testOffsetValidationAwaitsNodeApiVersion() {
        buildFetcher();
        assignFromUser(Collections.singleton(this.tp0));
        HashMap hashMap = new HashMap();
        hashMap.put(this.tp0.topic(), 4);
        this.metadata.updateWithCurrentRequestVersion(RequestTestUtils.metadataUpdateWithIds("dummy", 1, Collections.emptyMap(), hashMap, topicPartition -> {
            return 1;
        }, this.topicIds), false, 0L);
        Node node = (Node) this.metadata.fetch().nodes().get(0);
        Assertions.assertFalse(this.client.isConnected(node.idString()));
        this.subscriptions.seekUnvalidated(this.tp0, new SubscriptionState.FetchPosition(20L, Optional.of(1), new Metadata.LeaderAndEpoch(this.metadata.currentLeader(this.tp0).leader, Optional.of(1))));
        Assertions.assertFalse(this.client.isConnected(node.idString()));
        Assertions.assertTrue(this.subscriptions.awaitingValidation(this.tp0));
        this.offsetFetcher.validatePositionsIfNeeded();
        Assertions.assertTrue(this.subscriptions.awaitingValidation(this.tp0));
        Assertions.assertTrue(this.client.isConnected(node.idString()));
        this.apiVersions.update(node.idString(), NodeApiVersions.create());
        this.client.prepareResponseFrom(prepareOffsetsForLeaderEpochResponse(this.tp0, 1, 30L), node);
        this.offsetFetcher.validatePositionsIfNeeded();
        this.consumerClient.pollNoWakeup();
        Assertions.assertFalse(this.subscriptions.awaitingValidation(this.tp0));
        Assertions.assertEquals(20L, this.subscriptions.position(this.tp0).offset);
    }

    @Test
    public void testOffsetValidationSkippedForOldBroker() {
        IsolationLevel isolationLevel = IsolationLevel.READ_UNCOMMITTED;
        OffsetResetStrategy offsetResetStrategy = OffsetResetStrategy.EARLIEST;
        MetricConfig metricConfig = new MetricConfig();
        LogContext logContext = new LogContext();
        buildFetcher(metricConfig, isolationLevel, Long.MAX_VALUE, new SubscriptionState(logContext, offsetResetStrategy), logContext);
        FetchMetricsRegistry fetchMetricsRegistry = new FetchMetricsRegistry(metricConfig.tags().keySet(), "consumertest-group");
        Fetcher fetcher = new Fetcher(logContext, this.consumerClient, this.metadata, this.subscriptions, new FetchConfig(1, Integer.MAX_VALUE, 0, 1000, Integer.MAX_VALUE, true, "", new ByteArrayDeserializer(), new ByteArrayDeserializer(), isolationLevel), new FetchMetricsManager(this.metrics, fetchMetricsRegistry), this.time);
        assignFromUser(Collections.singleton(this.tp0));
        HashMap hashMap = new HashMap();
        hashMap.put(this.tp0.topic(), 4);
        this.metadata.updateWithCurrentRequestVersion(RequestTestUtils.metadataUpdateWithIds("dummy", 1, Collections.emptyMap(), hashMap, topicPartition -> {
            return 1;
        }, this.topicIds), false, 0L);
        this.apiVersions.update(((Node) this.metadata.fetch().nodes().get(0)).idString(), NodeApiVersions.create(ApiKeys.OFFSET_FOR_LEADER_EPOCH.id, (short) 0, (short) 2));
        this.subscriptions.seekUnvalidated(this.tp0, new SubscriptionState.FetchPosition(0L, Optional.of(1), new Metadata.LeaderAndEpoch(this.metadata.currentLeader(this.tp0).leader, Optional.of(1))));
        this.metadata.updateWithCurrentRequestVersion(RequestTestUtils.metadataUpdateWithIds("dummy", 1, Collections.emptyMap(), hashMap, topicPartition2 -> {
            return 2;
        }, this.topicIds), false, 0L);
        this.offsetFetcher.validatePositionsIfNeeded();
        Assertions.assertFalse(this.subscriptions.awaitingValidation(this.tp0));
        this.subscriptions.seekUnvalidated(this.tp0, new SubscriptionState.FetchPosition(0L, Optional.of(1), new Metadata.LeaderAndEpoch(this.metadata.currentLeader(this.tp0).leader, Optional.of(1))));
        this.metadata.updateWithCurrentRequestVersion(RequestTestUtils.metadataUpdateWithIds("dummy", 1, Collections.emptyMap(), hashMap, topicPartition3 -> {
            return 2;
        }, this.topicIds), false, 0L);
        this.offsetFetcher.validatePositionsOnMetadataChange();
        Assertions.assertEquals(1, fetcher.sendFetches());
        Assertions.assertFalse(this.subscriptions.awaitingValidation(this.tp0));
    }

    @Test
    public void testOffsetValidationSkippedForOldResponse() {
        buildFetcher();
        assignFromUser(Collections.singleton(this.tp0));
        HashMap hashMap = new HashMap();
        hashMap.put(this.tp0.topic(), 4);
        this.metadata.updateWithCurrentRequestVersion(RequestTestUtils.metadataUpdateWithIds("dummy", 1, Collections.emptyMap(), hashMap, topicPartition -> {
            return 1;
        }, this.topicIds), false, 0L);
        Node node = (Node) this.metadata.fetch().nodes().get(0);
        Assertions.assertFalse(this.client.isConnected(node.idString()));
        this.subscriptions.seekUnvalidated(this.tp0, new SubscriptionState.FetchPosition(20L, Optional.of(1), new Metadata.LeaderAndEpoch(this.metadata.currentLeader(this.tp0).leader, Optional.of(1))));
        Assertions.assertFalse(this.client.isConnected(node.idString()));
        Assertions.assertTrue(this.subscriptions.awaitingValidation(this.tp0));
        this.metadata.updateWithCurrentRequestVersion(RequestTestUtils.metadataUpdateWith("dummy", 1, Collections.emptyMap(), hashMap, topicPartition2 -> {
            return null;
        }, MetadataResponse.PartitionMetadata::new, (short) 8, this.topicIds), false, 0L);
        this.offsetFetcher.validatePositionsIfNeeded();
        Assertions.assertFalse(this.subscriptions.awaitingValidation(this.tp0));
    }

    @Test
    public void testOffsetValidationresetPositionForUndefinedEpochWithDefinedResetPolicy() {
        testOffsetValidationWithGivenEpochOffset(-1, 0L, OffsetResetStrategy.EARLIEST);
    }

    @Test
    public void testOffsetValidationresetPositionForUndefinedOffsetWithDefinedResetPolicy() {
        testOffsetValidationWithGivenEpochOffset(2, -1L, OffsetResetStrategy.EARLIEST);
    }

    @Test
    public void testOffsetValidationresetPositionForUndefinedEpochWithUndefinedResetPolicy() {
        testOffsetValidationWithGivenEpochOffset(-1, 0L, OffsetResetStrategy.NONE);
    }

    @Test
    public void testOffsetValidationresetPositionForUndefinedOffsetWithUndefinedResetPolicy() {
        testOffsetValidationWithGivenEpochOffset(2, -1L, OffsetResetStrategy.NONE);
    }

    @Test
    public void testOffsetValidationTriggerLogTruncationForBadOffsetWithUndefinedResetPolicy() {
        testOffsetValidationWithGivenEpochOffset(1, 1L, OffsetResetStrategy.NONE);
    }

    private void testOffsetValidationWithGivenEpochOffset(int i, long j, OffsetResetStrategy offsetResetStrategy) {
        buildFetcher(offsetResetStrategy);
        assignFromUser(Collections.singleton(this.tp0));
        HashMap hashMap = new HashMap();
        hashMap.put(this.tp0.topic(), 4);
        this.metadata.updateWithCurrentRequestVersion(RequestTestUtils.metadataUpdateWithIds("dummy", 1, Collections.emptyMap(), hashMap, topicPartition -> {
            return 1;
        }, this.topicIds), false, 0L);
        this.apiVersions.update(((Node) this.metadata.fetch().nodes().get(0)).idString(), NodeApiVersions.create());
        this.subscriptions.seekUnvalidated(this.tp0, new SubscriptionState.FetchPosition(5L, Optional.of(1), new Metadata.LeaderAndEpoch(this.metadata.currentLeader(this.tp0).leader, Optional.of(1))));
        this.offsetFetcher.validatePositionsIfNeeded();
        this.consumerClient.poll(this.time.timer(Duration.ZERO));
        Assertions.assertTrue(this.subscriptions.awaitingValidation(this.tp0));
        Assertions.assertTrue(this.client.hasInFlightRequests());
        this.client.respond(offsetsForLeaderEpochRequestMatcher(this.tp0), (AbstractResponse) prepareOffsetsForLeaderEpochResponse(this.tp0, i, j));
        this.consumerClient.poll(this.time.timer(Duration.ZERO));
        if (offsetResetStrategy != OffsetResetStrategy.NONE) {
            this.offsetFetcher.validatePositionsIfNeeded();
            Assertions.assertFalse(this.subscriptions.awaitingValidation(this.tp0));
            return;
        }
        LogTruncationException assertThrows = Assertions.assertThrows(LogTruncationException.class, () -> {
            this.offsetFetcher.validatePositionsIfNeeded();
        });
        Assertions.assertEquals(Collections.singletonMap(this.tp0, 5L), assertThrows.offsetOutOfRangePartitions());
        if (j == -1 || i == -1) {
            Assertions.assertEquals(Collections.emptyMap(), assertThrows.divergentOffsets());
        } else {
            Assertions.assertEquals(Collections.singletonMap(this.tp0, new OffsetAndMetadata(j, Optional.of(Integer.valueOf(i)), "")), assertThrows.divergentOffsets());
        }
        Assertions.assertTrue(this.subscriptions.awaitingValidation(this.tp0));
    }

    @Test
    public void testOffsetValidationHandlesSeekWithInflightOffsetForLeaderRequest() {
        buildFetcher();
        assignFromUser(Collections.singleton(this.tp0));
        HashMap hashMap = new HashMap();
        hashMap.put(this.tp0.topic(), 4);
        Optional of = Optional.of(1);
        this.metadata.updateWithCurrentRequestVersion(RequestTestUtils.metadataUpdateWithIds("dummy", 1, Collections.emptyMap(), hashMap, topicPartition -> {
            return 1;
        }, this.topicIds), false, 0L);
        this.apiVersions.update(((Node) this.metadata.fetch().nodes().get(0)).idString(), NodeApiVersions.create());
        Metadata.LeaderAndEpoch leaderAndEpoch = new Metadata.LeaderAndEpoch(this.metadata.currentLeader(this.tp0).leader, of);
        this.subscriptions.seekUnvalidated(this.tp0, new SubscriptionState.FetchPosition(0L, of, leaderAndEpoch));
        this.offsetFetcher.validatePositionsIfNeeded();
        this.consumerClient.poll(this.time.timer(Duration.ZERO));
        Assertions.assertTrue(this.subscriptions.awaitingValidation(this.tp0));
        Assertions.assertTrue(this.client.hasInFlightRequests());
        this.subscriptions.seekUnvalidated(this.tp0, new SubscriptionState.FetchPosition(5L, of, leaderAndEpoch));
        Assertions.assertTrue(this.subscriptions.awaitingValidation(this.tp0));
        this.client.respond(offsetsForLeaderEpochRequestMatcher(this.tp0), (AbstractResponse) prepareOffsetsForLeaderEpochResponse(this.tp0, 0, 0L));
        this.consumerClient.poll(this.time.timer(Duration.ZERO));
        Assertions.assertTrue(this.subscriptions.awaitingValidation(this.tp0));
    }

    @Test
    public void testOffsetValidationFencing() {
        buildFetcher();
        assignFromUser(Collections.singleton(this.tp0));
        HashMap hashMap = new HashMap();
        hashMap.put(this.tp0.topic(), 4);
        this.metadata.updateWithCurrentRequestVersion(RequestTestUtils.metadataUpdateWithIds("dummy", 1, Collections.emptyMap(), hashMap, topicPartition -> {
            return 1;
        }, this.topicIds), false, 0L);
        this.apiVersions.update(((Node) this.metadata.fetch().nodes().get(0)).idString(), NodeApiVersions.create());
        Metadata.LeaderAndEpoch leaderAndEpoch = new Metadata.LeaderAndEpoch(this.metadata.currentLeader(this.tp0).leader, Optional.of(1));
        this.subscriptions.seekValidated(this.tp0, new SubscriptionState.FetchPosition(0L, Optional.of(1), leaderAndEpoch));
        this.metadata.updateWithCurrentRequestVersion(RequestTestUtils.metadataUpdateWithIds("dummy", 1, Collections.emptyMap(), hashMap, topicPartition2 -> {
            return 2;
        }, this.topicIds), false, 0L);
        this.offsetFetcher.validatePositionsIfNeeded();
        Assertions.assertTrue(this.subscriptions.awaitingValidation(this.tp0));
        this.subscriptions.completeValidation(this.tp0);
        this.subscriptions.position(this.tp0, new SubscriptionState.FetchPosition(10L, Optional.of(2), new Metadata.LeaderAndEpoch(leaderAndEpoch.leader, Optional.of(2))));
        this.subscriptions.maybeValidatePositionForCurrentLeader(this.apiVersions, this.tp0, new Metadata.LeaderAndEpoch(leaderAndEpoch.leader, Optional.of(3)));
        this.client.prepareResponse(prepareOffsetsForLeaderEpochResponse(this.tp0, 2, 10L));
        this.consumerClient.pollNoWakeup();
        Assertions.assertTrue(this.subscriptions.awaitingValidation(this.tp0), "Expected validation to fail since leader epoch changed");
        this.offsetFetcher.validatePositionsIfNeeded();
        this.client.prepareResponse(prepareOffsetsForLeaderEpochResponse(this.tp0, 3, 10L));
        this.consumerClient.pollNoWakeup();
        Assertions.assertFalse(this.subscriptions.awaitingValidation(this.tp0), "Expected validation to succeed with latest epoch");
    }

    @Test
    public void testBeginningOffsets() {
        buildFetcher();
        assignFromUser(Collections.singleton(this.tp0));
        this.client.prepareResponse(listOffsetResponse(this.tp0, Errors.NONE, -2L, 2L));
        Assertions.assertEquals(Collections.singletonMap(this.tp0, 2L), this.offsetFetcher.beginningOffsets(Collections.singleton(this.tp0), this.time.timer(5000L)));
    }

    @Test
    public void testBeginningOffsetsDuplicateTopicPartition() {
        buildFetcher();
        assignFromUser(Collections.singleton(this.tp0));
        this.client.prepareResponse(listOffsetResponse(this.tp0, Errors.NONE, -2L, 2L));
        Assertions.assertEquals(Collections.singletonMap(this.tp0, 2L), this.offsetFetcher.beginningOffsets(Arrays.asList(this.tp0, this.tp0), this.time.timer(5000L)));
    }

    @Test
    public void testBeginningOffsetsMultipleTopicPartitions() {
        buildFetcher();
        HashMap hashMap = new HashMap();
        hashMap.put(this.tp0, 2L);
        hashMap.put(this.tp1, 4L);
        hashMap.put(this.tp2, 6L);
        assignFromUser(hashMap.keySet());
        this.client.prepareResponse(listOffsetResponse(hashMap, Errors.NONE, -2L, -1));
        Assertions.assertEquals(hashMap, this.offsetFetcher.beginningOffsets(Arrays.asList(this.tp0, this.tp1, this.tp2), this.time.timer(5000L)));
    }

    @Test
    public void testBeginningOffsetsEmpty() {
        buildFetcher();
        Assertions.assertEquals(Collections.emptyMap(), this.offsetFetcher.beginningOffsets(Collections.emptyList(), this.time.timer(5000L)));
    }

    @Test
    public void testEndOffsets() {
        buildFetcher();
        assignFromUser(Collections.singleton(this.tp0));
        this.client.prepareResponse(listOffsetResponse(this.tp0, Errors.NONE, -1L, 5L));
        Assertions.assertEquals(Collections.singletonMap(this.tp0, 5L), this.offsetFetcher.endOffsets(Collections.singleton(this.tp0), this.time.timer(5000L)));
    }

    @Test
    public void testEndOffsetsDuplicateTopicPartition() {
        buildFetcher();
        assignFromUser(Collections.singleton(this.tp0));
        this.client.prepareResponse(listOffsetResponse(this.tp0, Errors.NONE, -1L, 5L));
        Assertions.assertEquals(Collections.singletonMap(this.tp0, 5L), this.offsetFetcher.endOffsets(Arrays.asList(this.tp0, this.tp0), this.time.timer(5000L)));
    }

    @Test
    public void testEndOffsetsMultipleTopicPartitions() {
        buildFetcher();
        HashMap hashMap = new HashMap();
        hashMap.put(this.tp0, 5L);
        hashMap.put(this.tp1, 7L);
        hashMap.put(this.tp2, 9L);
        assignFromUser(hashMap.keySet());
        this.client.prepareResponse(listOffsetResponse(hashMap, Errors.NONE, -1L, -1));
        Assertions.assertEquals(hashMap, this.offsetFetcher.endOffsets(Arrays.asList(this.tp0, this.tp1, this.tp2), this.time.timer(5000L)));
    }

    @Test
    public void testEndOffsetsEmpty() {
        buildFetcher();
        Assertions.assertEquals(Collections.emptyMap(), this.offsetFetcher.endOffsets(Collections.emptyList(), this.time.timer(5000L)));
    }

    private MockClient.RequestMatcher offsetsForLeaderEpochRequestMatcher(TopicPartition topicPartition) {
        int i = 1;
        int i2 = 1;
        return abstractRequest -> {
            OffsetForLeaderEpochRequestData.OffsetForLeaderPartition offsetForLeaderPartition = offsetForLeaderPartitionMap(((OffsetsForLeaderEpochRequest) abstractRequest).data()).get(topicPartition);
            return offsetForLeaderPartition != null && offsetForLeaderPartition.currentLeaderEpoch() == i && offsetForLeaderPartition.leaderEpoch() == i2;
        };
    }

    private OffsetsForLeaderEpochResponse prepareOffsetsForLeaderEpochResponse(TopicPartition topicPartition, int i, long j) {
        OffsetForLeaderEpochResponseData offsetForLeaderEpochResponseData = new OffsetForLeaderEpochResponseData();
        offsetForLeaderEpochResponseData.topics().add(new OffsetForLeaderEpochResponseData.OffsetForLeaderTopicResult().setTopic(topicPartition.topic()).setPartitions(Collections.singletonList(new OffsetForLeaderEpochResponseData.EpochEndOffset().setPartition(topicPartition.partition()).setErrorCode(Errors.NONE.code()).setLeaderEpoch(i).setEndOffset(j))));
        return new OffsetsForLeaderEpochResponse(offsetForLeaderEpochResponseData);
    }

    private Map<TopicPartition, OffsetForLeaderEpochRequestData.OffsetForLeaderPartition> offsetForLeaderPartitionMap(OffsetForLeaderEpochRequestData offsetForLeaderEpochRequestData) {
        HashMap hashMap = new HashMap();
        offsetForLeaderEpochRequestData.topics().forEach(offsetForLeaderTopic -> {
            offsetForLeaderTopic.partitions().forEach(offsetForLeaderPartition -> {
                hashMap.put(new TopicPartition(offsetForLeaderTopic.topic(), offsetForLeaderPartition.partition()), offsetForLeaderPartition);
            });
        });
        return hashMap;
    }

    private MockClient.RequestMatcher listOffsetRequestMatcher(long j) {
        return listOffsetRequestMatcher(j, -1);
    }

    private MockClient.RequestMatcher listOffsetRequestMatcher(long j, int i) {
        return abstractRequest -> {
            ListOffsetsRequestData.ListOffsetsTopic listOffsetsTopic = (ListOffsetsRequestData.ListOffsetsTopic) ((ListOffsetsRequest) abstractRequest).topics().get(0);
            ListOffsetsRequestData.ListOffsetsPartition listOffsetsPartition = (ListOffsetsRequestData.ListOffsetsPartition) listOffsetsTopic.partitions().get(0);
            return this.tp0.topic().equals(listOffsetsTopic.name()) && this.tp0.partition() == listOffsetsPartition.partitionIndex() && j == listOffsetsPartition.timestamp() && i == listOffsetsPartition.currentLeaderEpoch();
        };
    }

    private ListOffsetsResponse listOffsetResponse(Errors errors, long j, long j2) {
        return listOffsetResponse(this.tp0, errors, j, j2);
    }

    private ListOffsetsResponse listOffsetResponse(TopicPartition topicPartition, Errors errors, long j, long j2) {
        return listOffsetResponse(topicPartition, errors, j, j2, -1);
    }

    private ListOffsetsResponse listOffsetResponse(TopicPartition topicPartition, Errors errors, long j, long j2, int i) {
        HashMap hashMap = new HashMap();
        hashMap.put(topicPartition, Long.valueOf(j2));
        return listOffsetResponse(hashMap, errors, j, i);
    }

    private ListOffsetsResponse listOffsetResponse(Map<TopicPartition, Long> map, Errors errors, long j, int i) {
        HashMap hashMap = new HashMap();
        for (Map.Entry<TopicPartition, Long> entry : map.entrySet()) {
            TopicPartition key = entry.getKey();
            hashMap.putIfAbsent(key.topic(), new ArrayList());
            ((List) hashMap.get(key.topic())).add(new ListOffsetsResponseData.ListOffsetsPartitionResponse().setPartitionIndex(key.partition()).setErrorCode(errors.code()).setOffset(entry.getValue().longValue()).setTimestamp(j).setLeaderEpoch(i));
        }
        ArrayList arrayList = new ArrayList();
        for (Map.Entry entry2 : hashMap.entrySet()) {
            arrayList.add(new ListOffsetsResponseData.ListOffsetsTopicResponse().setName((String) entry2.getKey()).setPartitions((List) entry2.getValue()));
        }
        return new ListOffsetsResponse(new ListOffsetsResponseData().setTopics(arrayList));
    }

    private void buildFetcher() {
        buildFetcher(IsolationLevel.READ_UNCOMMITTED);
    }

    private void buildFetcher(OffsetResetStrategy offsetResetStrategy) {
        buildFetcher(new MetricConfig(), offsetResetStrategy, IsolationLevel.READ_UNCOMMITTED);
    }

    private void buildFetcher(IsolationLevel isolationLevel) {
        buildFetcher(new MetricConfig(), OffsetResetStrategy.EARLIEST, isolationLevel);
    }

    private void buildFetcher(MetricConfig metricConfig, OffsetResetStrategy offsetResetStrategy, IsolationLevel isolationLevel) {
        LogContext logContext = new LogContext();
        buildFetcher(metricConfig, isolationLevel, Long.MAX_VALUE, new SubscriptionState(logContext, offsetResetStrategy), logContext);
    }

    private void buildFetcher(MetricConfig metricConfig, IsolationLevel isolationLevel, long j, SubscriptionState subscriptionState, LogContext logContext) {
        buildDependencies(metricConfig, j, subscriptionState, logContext);
        this.offsetFetcher = new OffsetFetcher(logContext, this.consumerClient, this.metadata, this.subscriptions, this.time, 100L, 30000L, isolationLevel, this.apiVersions);
    }

    private void buildDependencies(MetricConfig metricConfig, long j, SubscriptionState subscriptionState, LogContext logContext) {
        this.time = new MockTime(1L);
        this.subscriptions = subscriptionState;
        this.metadata = new ConsumerMetadata(0L, j, false, false, this.subscriptions, logContext, new ClusterResourceListeners());
        this.client = new MockClient((Time) this.time, (Metadata) this.metadata);
        this.metrics = new Metrics(metricConfig, this.time);
        this.consumerClient = new ConsumerNetworkClient(logContext, this.client, this.metadata, this.time, 100L, 1000, Integer.MAX_VALUE);
    }
}
