package org.apache.kafka.coordinator.transaction;

import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import org.apache.kafka.clients.ClientResponse;
import org.apache.kafka.common.message.AllocateProducerIdsRequestData;
import org.apache.kafka.common.message.AllocateProducerIdsResponseData;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.AllocateProducerIdsRequest;
import org.apache.kafka.common.requests.AllocateProducerIdsResponse;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.server.common.ControllerRequestCompletionHandler;
import org.apache.kafka.server.common.NodeToControllerChannelManager;
import org.apache.kafka.server.common.ProducerIdsBlock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/kafka/coordinator/transaction/RPCProducerIdManager.class */
public class RPCProducerIdManager implements ProducerIdManager {
    static final int RETRY_BACKOFF_MS = 50;
    private static final double PID_PREFETCH_THRESHOLD = 0.9d;
    private static final int ITERATION_LIMIT = 3;
    private static final long NO_RETRY = -1;
    private static final Logger log = LoggerFactory.getLogger(RPCProducerIdManager.class);
    private final String logPrefix;
    private final int brokerId;
    private final Time time;
    private final Supplier<Long> brokerEpochSupplier;
    private final NodeToControllerChannelManager controllerChannel;
    final AtomicReference<ProducerIdsBlock> nextProducerIdBlock = new AtomicReference<>(null);
    private final AtomicReference<ProducerIdsBlock> currentProducerIdBlock = new AtomicReference<>(ProducerIdsBlock.EMPTY);
    private final AtomicBoolean requestInFlight = new AtomicBoolean(false);
    private final AtomicLong backoffDeadlineMs = new AtomicLong(NO_RETRY);

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.kafka.coordinator.transaction.RPCProducerIdManager$2, reason: invalid class name */
    /* loaded from: input_file:org/apache/kafka/coordinator/transaction/RPCProducerIdManager$2.class */
    public static /* synthetic */ class AnonymousClass2 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$kafka$common$protocol$Errors = new int[Errors.values().length];

        static {
            try {
                $SwitchMap$org$apache$kafka$common$protocol$Errors[Errors.NONE.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$kafka$common$protocol$Errors[Errors.STALE_BROKER_EPOCH.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$apache$kafka$common$protocol$Errors[Errors.BROKER_ID_NOT_REGISTERED.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
        }
    }

    public RPCProducerIdManager(int i, Time time, Supplier<Long> supplier, NodeToControllerChannelManager nodeToControllerChannelManager) {
        this.brokerId = i;
        this.time = time;
        this.brokerEpochSupplier = supplier;
        this.controllerChannel = nodeToControllerChannelManager;
        this.logPrefix = "[RPC ProducerId Manager " + i + "]: ";
    }

    @Override // org.apache.kafka.coordinator.transaction.ProducerIdManager
    public long generateProducerId() {
        for (int i = 0; i <= 3; i++) {
            Optional claimNextId = this.currentProducerIdBlock.get().claimNextId();
            if (claimNextId.isPresent()) {
                long longValue = ((Long) claimNextId.get()).longValue();
                if (longValue == this.currentProducerIdBlock.get().firstProducerId() + ((long) (this.currentProducerIdBlock.get().size() * PID_PREFETCH_THRESHOLD))) {
                    maybeRequestNextBlock();
                }
                return longValue;
            }
            ProducerIdsBlock andSet = this.nextProducerIdBlock.getAndSet(null);
            if (andSet == null) {
                maybeRequestNextBlock();
                throw Errors.COORDINATOR_LOAD_IN_PROGRESS.exception("Producer ID block is full. Waiting for next block");
            }
            this.currentProducerIdBlock.set(andSet);
            this.requestInFlight.set(false);
        }
        throw Errors.COORDINATOR_LOAD_IN_PROGRESS.exception("Producer ID block is full. Waiting for next block");
    }

    private void maybeRequestNextBlock() {
        long j = this.backoffDeadlineMs.get();
        if ((j == NO_RETRY || this.time.milliseconds() >= j) && this.nextProducerIdBlock.get() == null && this.requestInFlight.compareAndSet(false, true)) {
            sendRequest();
            this.backoffDeadlineMs.set(NO_RETRY);
        }
    }

    protected void sendRequest() {
        AllocateProducerIdsRequest.Builder builder = new AllocateProducerIdsRequest.Builder(new AllocateProducerIdsRequestData().setBrokerEpoch(this.brokerEpochSupplier.get().longValue()).setBrokerId(this.brokerId));
        log.debug("{} Requesting next Producer ID block", this.logPrefix);
        this.controllerChannel.sendRequest(builder, new ControllerRequestCompletionHandler() { // from class: org.apache.kafka.coordinator.transaction.RPCProducerIdManager.1
            public void onComplete(ClientResponse clientResponse) {
                if (clientResponse.responseBody() instanceof AllocateProducerIdsResponse) {
                    RPCProducerIdManager.this.handleAllocateProducerIdsResponse((AllocateProducerIdsResponse) clientResponse.responseBody());
                }
            }

            public void onTimeout() {
                RPCProducerIdManager.log.warn("{} Timed out when requesting AllocateProducerIds from the controller.", RPCProducerIdManager.this.logPrefix);
                RPCProducerIdManager.this.requestInFlight.set(false);
            }
        });
    }

    protected void handleAllocateProducerIdsResponse(AllocateProducerIdsResponse allocateProducerIdsResponse) {
        AllocateProducerIdsResponseData data = allocateProducerIdsResponse.data();
        boolean z = false;
        Errors forCode = Errors.forCode(data.errorCode());
        switch (AnonymousClass2.$SwitchMap$org$apache$kafka$common$protocol$Errors[forCode.ordinal()]) {
            case 1:
                log.debug("{} Got next producer ID block from controller {}", this.logPrefix, data);
                z = sanityCheckResponse(data);
                break;
            case TransactionLogConfig.TRANSACTIONS_TOPIC_MIN_ISR_DEFAULT /* 2 */:
                log.warn("{} Our broker currentBlockCount was stale, trying again.", this.logPrefix);
                break;
            case 3:
                log.warn("{} Our broker ID is not yet known by the controller, trying again.", this.logPrefix);
                break;
            default:
                log.error("{} Received error code {} from the controller.", this.logPrefix, forCode);
                break;
        }
        if (z) {
            return;
        }
        this.backoffDeadlineMs.set(this.time.milliseconds() + 50);
        this.requestInFlight.set(false);
    }

    private boolean sanityCheckResponse(AllocateProducerIdsResponseData allocateProducerIdsResponseData) {
        if (allocateProducerIdsResponseData.producerIdStart() < this.currentProducerIdBlock.get().lastProducerId()) {
            log.error("{} Producer ID block is not monotonic with current block: current={} response={}", new Object[]{this.logPrefix, this.currentProducerIdBlock.get(), allocateProducerIdsResponseData});
            return false;
        }
        if (allocateProducerIdsResponseData.producerIdStart() < 0 || allocateProducerIdsResponseData.producerIdLen() < 0 || allocateProducerIdsResponseData.producerIdStart() > Long.MAX_VALUE - allocateProducerIdsResponseData.producerIdLen()) {
            log.error("{} Producer ID block includes invalid ID range: {}", this.logPrefix, allocateProducerIdsResponseData);
            return false;
        }
        this.nextProducerIdBlock.set(new ProducerIdsBlock(this.brokerId, allocateProducerIdsResponseData.producerIdStart(), allocateProducerIdsResponseData.producerIdLen()));
        return true;
    }
}
