package org.apache.kafka.coordinator.transaction;

import java.util.Collections;
import java.util.Objects;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.kafka.common.errors.CoordinatorLoadInProgressException;
import org.apache.kafka.common.message.AllocateProducerIdsResponseData;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.AllocateProducerIdsResponse;
import org.apache.kafka.common.test.TestUtils;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.server.common.NodeToControllerChannelManager;
import org.apache.kafka.server.common.ProducerIdsBlock;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.Mockito;

/* loaded from: input_file:org/apache/kafka/coordinator/transaction/ProducerIdManagerTest.class */
public class ProducerIdManagerTest {
    private final NodeToControllerChannelManager brokerToController = (NodeToControllerChannelManager) Mockito.mock(NodeToControllerChannelManager.class);

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/kafka/coordinator/transaction/ProducerIdManagerTest$MockProducerIdManager.class */
    public class MockProducerIdManager extends RPCProducerIdManager {
        private final Queue<Errors> errorQueue;
        private final boolean isErroneousBlock;
        private final AtomicBoolean capturedFailure;
        private final ExecutorService brokerToControllerRequestExecutor;
        private final int idLen;
        private Long idStart;

        MockProducerIdManager(int i, long j, int i2, Queue<Errors> queue, boolean z, Time time) {
            super(i, time, () -> {
                return 1L;
            }, ProducerIdManagerTest.this.brokerToController);
            this.capturedFailure = new AtomicBoolean(false);
            this.brokerToControllerRequestExecutor = Executors.newSingleThreadExecutor();
            this.idStart = Long.valueOf(j);
            this.idLen = i2;
            this.errorQueue = queue;
            this.isErroneousBlock = z;
        }

        protected void sendRequest() {
            this.brokerToControllerRequestExecutor.submit(() -> {
                Errors poll = this.errorQueue.poll();
                if (poll != null && poll != Errors.NONE) {
                    handleAllocateProducerIdsResponse(new AllocateProducerIdsResponse(new AllocateProducerIdsResponseData().setErrorCode(poll.code())));
                    return;
                }
                handleAllocateProducerIdsResponse(new AllocateProducerIdsResponse(new AllocateProducerIdsResponseData().setProducerIdStart(this.idStart.longValue()).setProducerIdLen(this.idLen)));
                if (this.isErroneousBlock) {
                    return;
                }
                this.idStart = Long.valueOf(this.idStart.longValue() + this.idLen);
            }, 0);
        }

        protected void handleAllocateProducerIdsResponse(AllocateProducerIdsResponse allocateProducerIdsResponse) {
            super.handleAllocateProducerIdsResponse(allocateProducerIdsResponse);
            this.capturedFailure.set(this.nextProducerIdBlock.get() == null);
        }
    }

    @ValueSource(ints = {1, 2, 10, 100})
    @ParameterizedTest
    public void testConcurrentGeneratePidRequests(int i) throws InterruptedException {
        int i2 = 5;
        CountDownLatch countDownLatch = new CountDownLatch(i * 3);
        MockProducerIdManager mockProducerIdManager = new MockProducerIdManager(0, 0L, i, new ConcurrentLinkedQueue(), false, Time.SYSTEM);
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(5);
        ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap();
        for (int i3 = 0; i3 < 5; i3++) {
            newFixedThreadPool.submit(() -> {
                while (countDownLatch.getCount() > 0) {
                    try {
                        long generateProducerId = mockProducerIdManager.generateProducerId();
                        synchronized (concurrentHashMap) {
                            if (countDownLatch.getCount() != 0) {
                                concurrentHashMap.merge(Long.valueOf(generateProducerId), 1, (v0, v1) -> {
                                    return Integer.sum(v0, v1);
                                });
                                countDownLatch.countDown();
                            }
                        }
                    } catch (Exception e) {
                        Assertions.assertEquals(CoordinatorLoadInProgressException.class, e.getClass());
                    }
                    Assertions.assertDoesNotThrow(() -> {
                        Thread.sleep(100L);
                    });
                }
            });
        }
        Assertions.assertTrue(countDownLatch.await(12000L, TimeUnit.MILLISECONDS));
        newFixedThreadPool.shutdown();
        Assertions.assertEquals(i * 3, concurrentHashMap.size());
        concurrentHashMap.forEach((l, num) -> {
            Assertions.assertEquals(1, num);
            Assertions.assertTrue(l.longValue() < (3 * ((long) i)) + ((long) i2), "Unexpected pid " + l + "; non-contiguous blocks generated or did not fully exhaust blocks.");
        });
    }

    @EnumSource(value = Errors.class, names = {"UNKNOWN_SERVER_ERROR", "INVALID_REQUEST"})
    @ParameterizedTest
    public void testUnrecoverableErrors(Errors errors) throws Exception {
        MockTime mockTime = new MockTime();
        MockProducerIdManager mockProducerIdManager = new MockProducerIdManager(0, 0L, 1, queue(Errors.NONE, errors), false, mockTime);
        verifyNewBlockAndProducerId(mockProducerIdManager, new ProducerIdsBlock(0, 0L, 1), 0L);
        verifyFailureWithoutGenerateProducerId(mockProducerIdManager);
        mockTime.sleep(50L);
        verifyNewBlockAndProducerId(mockProducerIdManager, new ProducerIdsBlock(0, 1L, 1), 1L);
    }

    @Test
    public void testInvalidRanges() throws InterruptedException {
        verifyFailure(new MockProducerIdManager(0, -1L, 10, new ConcurrentLinkedQueue(), true, Time.SYSTEM));
        verifyFailure(new MockProducerIdManager(0, 0L, -1, new ConcurrentLinkedQueue(), true, Time.SYSTEM));
        verifyFailure(new MockProducerIdManager(0, 9223372036854775806L, 10, new ConcurrentLinkedQueue(), true, Time.SYSTEM));
    }

    @Test
    public void testRetryBackoff() throws Exception {
        MockTime mockTime = new MockTime();
        MockProducerIdManager mockProducerIdManager = new MockProducerIdManager(0, 0L, 1, queue(Errors.UNKNOWN_SERVER_ERROR), false, mockTime);
        verifyFailure(mockProducerIdManager);
        Objects.requireNonNull(mockProducerIdManager);
        Assertions.assertThrows(CoordinatorLoadInProgressException.class, mockProducerIdManager::generateProducerId);
        mockTime.sleep(50L);
        verifyNewBlockAndProducerId(mockProducerIdManager, new ProducerIdsBlock(0, 0L, 1), 0L);
    }

    private Queue<Errors> queue(Errors... errorsArr) {
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        Collections.addAll(concurrentLinkedQueue, errorsArr);
        return concurrentLinkedQueue;
    }

    private void verifyFailure(MockProducerIdManager mockProducerIdManager) throws InterruptedException {
        Objects.requireNonNull(mockProducerIdManager);
        Assertions.assertThrows(CoordinatorLoadInProgressException.class, mockProducerIdManager::generateProducerId);
        verifyFailureWithoutGenerateProducerId(mockProducerIdManager);
    }

    private void verifyFailureWithoutGenerateProducerId(MockProducerIdManager mockProducerIdManager) throws InterruptedException {
        TestUtils.waitForCondition(() -> {
            Boolean valueOf;
            synchronized (mockProducerIdManager) {
                valueOf = Boolean.valueOf(mockProducerIdManager.capturedFailure.get());
            }
            return valueOf;
        }, "Expected failure");
        mockProducerIdManager.capturedFailure.set(false);
    }

    private void verifyNewBlockAndProducerId(MockProducerIdManager mockProducerIdManager, ProducerIdsBlock producerIdsBlock, long j) throws Exception {
        Objects.requireNonNull(mockProducerIdManager);
        Assertions.assertThrows(CoordinatorLoadInProgressException.class, mockProducerIdManager::generateProducerId);
        TestUtils.waitForCondition(() -> {
            ProducerIdsBlock producerIdsBlock2 = (ProducerIdsBlock) mockProducerIdManager.nextProducerIdBlock.get();
            return Boolean.valueOf(producerIdsBlock2 != null && producerIdsBlock2.equals(producerIdsBlock));
        }, "failed to generate block");
        Assertions.assertEquals(j, mockProducerIdManager.generateProducerId());
    }
}
