package io.kroxylicious.proxy.internal.codec;

import edu.umd.cs.findbugs.annotations.NonNull;
import io.kroxylicious.proxy.frame.DecodedRequestFrame;
import io.kroxylicious.proxy.frame.Frame;
import io.kroxylicious.proxy.frame.OpaqueRequestFrame;
import io.kroxylicious.proxy.frame.RequestFrame;
import io.kroxylicious.proxy.internal.ApiVersionsServiceImpl;
import io.kroxylicious.proxy.internal.filter.ApiVersionsDowngradeFilter;
import io.kroxylicious.proxy.internal.util.Metrics;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import org.apache.kafka.common.message.ApiVersionsRequestData;
import org.apache.kafka.common.message.RequestHeaderData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.ApiMessage;
import org.apache.kafka.common.protocol.Readable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/kroxylicious/proxy/internal/codec/KafkaRequestDecoder.class */
public class KafkaRequestDecoder extends KafkaMessageDecoder {
    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaRequestDecoder.class);
    private final DecodePredicate decodePredicate;
    private final ApiVersionsServiceImpl apiVersionsService;

    public KafkaRequestDecoder(DecodePredicate decodePredicate, int i, ApiVersionsServiceImpl apiVersionsServiceImpl) {
        super(i);
        this.decodePredicate = decodePredicate;
        this.apiVersionsService = apiVersionsServiceImpl;
    }

    @Override // io.kroxylicious.proxy.internal.codec.KafkaMessageDecoder
    protected Logger log() {
        return LOGGER;
    }

    @Override // io.kroxylicious.proxy.internal.codec.KafkaMessageDecoder
    protected Frame decodeHeaderAndBody(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, int i) {
        ByteBufAccessorImpl byteBufAccessorImpl;
        RequestFrame opaqueFrame;
        int readerIndex = byteBuf.readerIndex();
        short readShort = byteBuf.readShort();
        ApiKeys forId = ApiKeys.forId(readShort);
        if (log().isTraceEnabled()) {
            log().trace("{}: apiKey: {} {}", new Object[]{channelHandlerContext, Short.valueOf(readShort), forId});
        }
        short readShort2 = byteBuf.readShort();
        if (log().isTraceEnabled()) {
            log().trace("{}: apiVersion: {}", channelHandlerContext, Short.valueOf(readShort2));
        }
        int readerIndex2 = byteBuf.readerIndex();
        int readInt = byteBuf.readInt();
        LOGGER.debug("{}: {} downstream correlation id: {}", new Object[]{channelHandlerContext, forId, Integer.valueOf(readInt)});
        RequestHeaderData requestHeaderData = null;
        Metrics.inboundDownstreamMessagesCounter().increment();
        boolean shouldDecodeRequest = this.decodePredicate.shouldDecodeRequest(forId, readShort2);
        LOGGER.debug("Decode {}/v{} request? {}, Predicate {} ", new Object[]{forId, Short.valueOf(readShort2), Boolean.valueOf(shouldDecodeRequest), this.decodePredicate});
        boolean shouldDecodeResponse = this.decodePredicate.shouldDecodeResponse(forId, readShort2);
        LOGGER.debug("Decode {}/v{} response? {}, Predicate {}", new Object[]{forId, Short.valueOf(readShort2), Boolean.valueOf(shouldDecodeResponse), this.decodePredicate});
        short requestHeaderVersion = forId.requestHeaderVersion(readShort2);
        if (shouldDecodeRequest) {
            Metrics.inboundDownstreamDecodedMessagesCounter().increment();
            Metrics.payloadSizeBytesUpstreamSummary(forId, readShort2).record(i);
            if (log().isTraceEnabled()) {
                log().trace("{}: headerVersion {}", channelHandlerContext, Short.valueOf(requestHeaderVersion));
            }
            byteBuf.readerIndex(readerIndex);
            byteBufAccessorImpl = new ByteBufAccessorImpl(byteBuf);
            requestHeaderData = readHeader(requestHeaderVersion, byteBufAccessorImpl);
            if (log().isTraceEnabled()) {
                log().trace("{}: header: {}", channelHandlerContext, requestHeaderData);
            }
        } else {
            byteBufAccessorImpl = null;
        }
        if (shouldDecodeRequest) {
            short latestVersion = this.apiVersionsService.latestVersion(forId);
            if (readShort2 > latestVersion) {
                if (forId == ApiKeys.API_VERSIONS) {
                    return createV0ApiVersionRequestFrame(channelHandlerContext, readInt);
                }
                log().error("{}: apiVersion {} for {} ahead of proxy maximum: {}", new Object[]{channelHandlerContext, Short.valueOf(readShort2), forId, Short.valueOf(latestVersion)});
                throw new IllegalStateException("client apiVersion " + readShort2 + " ahead of proxy maximum " + latestVersion + " for api key: " + String.valueOf(forId));
            }
            ApiMessage decodeRequest = BodyDecoder.decodeRequest(forId, readShort2, byteBufAccessorImpl);
            if (log().isTraceEnabled()) {
                log().trace("{}: body {}", channelHandlerContext, decodeRequest);
            }
            opaqueFrame = new DecodedRequestFrame(readShort2, readInt, shouldDecodeResponse, requestHeaderData, decodeRequest);
            if (log().isTraceEnabled()) {
                log().trace("{}: frame {}", channelHandlerContext, opaqueFrame);
            }
        } else {
            boolean z = true;
            if (forId == ApiKeys.PRODUCE) {
                z = readAcks(byteBuf, readerIndex2, forId.id, readShort2) != 0;
            }
            byteBuf.readerIndex(readerIndex);
            opaqueFrame = opaqueFrame(byteBuf, readInt, shouldDecodeResponse, i, z);
            byteBuf.readerIndex(readerIndex + i);
        }
        return opaqueFrame;
    }

    @NonNull
    private DecodedRequestFrame<ApiVersionsRequestData> createV0ApiVersionRequestFrame(ChannelHandlerContext channelHandlerContext, int i) {
        if (log().isTraceEnabled()) {
            log().trace("{}: downgrading apiVersion request to v0", channelHandlerContext);
        }
        return ApiVersionsDowngradeFilter.downgradeApiVersionsFrame(i);
    }

    private static void incrementReaderIndex(ByteBuf byteBuf, int i) {
        byteBuf.readerIndex(byteBuf.readerIndex() + i);
    }

    static short readAcks(ByteBuf byteBuf, int i, short s, short s2) {
        int readUnsignedVarint;
        byteBuf.readerIndex(i);
        short requestHeaderVersion = ApiKeys.forId(s).requestHeaderVersion(s2);
        incrementReaderIndex(byteBuf, 4);
        if (requestHeaderVersion >= 1) {
            incrementReaderIndex(byteBuf, byteBuf.readShort());
        }
        if (requestHeaderVersion >= 2) {
            int readUnsignedVarint2 = ByteBufAccessorImpl.readUnsignedVarint(byteBuf);
            for (int i2 = 0; i2 < readUnsignedVarint2; i2++) {
                ByteBufAccessorImpl.readUnsignedVarint(byteBuf);
                incrementReaderIndex(byteBuf, ByteBufAccessorImpl.readUnsignedVarint(byteBuf));
            }
        }
        if (s2 >= 3) {
            if (s2 < 9) {
                readUnsignedVarint = byteBuf.readShort();
            } else {
                if (s2 > 11) {
                    throw new AssertionError("Unsupported Produce apiVersion: " + s2);
                }
                readUnsignedVarint = ByteBufAccessorImpl.readUnsignedVarint(byteBuf);
            }
            incrementReaderIndex(byteBuf, readUnsignedVarint);
        }
        return byteBuf.readShort();
    }

    private OpaqueRequestFrame opaqueFrame(ByteBuf byteBuf, int i, boolean z, int i2, boolean z2) {
        return new OpaqueRequestFrame(byteBuf.readSlice(i2).retain(), i, z, i2, z2);
    }

    private RequestHeaderData readHeader(short s, Readable readable) {
        return new RequestHeaderData(readable, s);
    }
}
