package org.joyqueue.client.internal.consumer.converter.kafka;

import java.io.ByteArrayOutputStream;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import org.joyqueue.client.internal.consumer.converter.MessageConverter;
import org.joyqueue.client.internal.consumer.converter.kafka.compressor.KafkaCompressionCodec;
import org.joyqueue.client.internal.consumer.converter.kafka.compressor.KafkaCompressionCodecFactory;
import org.joyqueue.client.internal.consumer.converter.kafka.compressor.stream.ByteBufferInputStream;
import org.joyqueue.client.internal.exception.ClientException;
import org.joyqueue.message.BrokerMessage;
import org.joyqueue.message.SourceType;
import org.joyqueue.shaded.com.google.common.base.Charsets;
import org.joyqueue.shaded.com.google.common.collect.Lists;
import org.joyqueue.shaded.com.google.common.collect.Maps;
import org.joyqueue.shaded.org.apache.commons.lang3.ArrayUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/joyqueue/client/internal/consumer/converter/kafka/KafkaMessageConverter.class */
public class KafkaMessageConverter implements MessageConverter {
    protected final Logger logger = LoggerFactory.getLogger(getClass());
    private static final byte MESSAGE_MAGIC_V0 = 0;
    private static final byte MESSAGE_MAGIC_V1 = 1;
    private static final byte MESSAGE_MAGIC_V2 = 2;
    private static final byte MESSAGE_CURRENT_MAGIC = 2;
    private static final int EXTENSION_V0_LENGTH = 1;
    private static final int EXTENSION_V1_LENGTH = 17;
    private static final int EXTENSION_CURRENT_LENGTH = 17;
    private static final int EXTENSION_BATCH_V0_LENGTH = 1;
    private static final int EXTENSION_BATCH_V1_LENGTH = 17;
    private static final int EXTENSION_CURRENT_BATCH_LENGTH = 17;
    private static final int EXTENSION_MAGIC_OFFSET = 0;
    private static final int EXTENSION_TIMESTAMP_OFFSET = 1;
    private static final int EXTENSION_ATTRIBUTE_OFFSET = 9;
    private static final byte COMPRESSION_CODEC_MASK = 7;
    private static final int DECOMPRESS_BUFFER_SIZE = 1024;

    @Override // org.joyqueue.client.internal.consumer.converter.MessageConverter
    public BrokerMessage convert(BrokerMessage brokerMessage) {
        byte[] extension = brokerMessage.getExtension();
        if (ArrayUtils.isEmpty(extension) || extension.length != 17) {
            return brokerMessage;
        }
        byte b = extension[0];
        KafkaCompressionCodec valueOf = KafkaCompressionCodec.valueOf(((short) KafkaBufferUtils.readUnsignedLongLE(extension, 9)) & 7);
        if (!valueOf.equals(KafkaCompressionCodec.NoCompressionCodec)) {
            brokerMessage = readDecompressedMessage(ByteBuffer.wrap(decompress(valueOf, brokerMessage.getBody(), b)), brokerMessage, b);
        }
        return brokerMessage;
    }

    protected BrokerMessage readDecompressedMessage(ByteBuffer byteBuffer, BrokerMessage brokerMessage, byte b) {
        byteBuffer.getLong();
        byteBuffer.getInt();
        byteBuffer.getInt();
        byteBuffer.get();
        byteBuffer.get();
        if (b >= 1) {
            byteBuffer.getLong();
        }
        byte[] readBytes = KafkaBufferUtils.readBytes(byteBuffer);
        byte[] readBytes2 = KafkaBufferUtils.readBytes(byteBuffer);
        brokerMessage.setBusinessId(ArrayUtils.isNotEmpty(readBytes) ? new String(readBytes, Charsets.UTF_8) : null);
        brokerMessage.setBody(ArrayUtils.isNotEmpty(readBytes2) ? readBytes2 : null);
        return brokerMessage;
    }

    @Override // org.joyqueue.client.internal.consumer.converter.MessageConverter
    public List<BrokerMessage> convertBatch(BrokerMessage brokerMessage) {
        ArrayList newArrayListWithCapacity = Lists.newArrayListWithCapacity(brokerMessage.getFlag());
        ByteBuffer wrap = ByteBuffer.wrap(tryDecompress(brokerMessage));
        for (int i = 0; i < brokerMessage.getFlag(); i++) {
            newArrayListWithCapacity.add(doConvertBatch(brokerMessage, wrap, i));
        }
        return newArrayListWithCapacity;
    }

    protected byte[] tryDecompress(BrokerMessage brokerMessage) {
        byte[] extension = brokerMessage.getExtension();
        if (ArrayUtils.isEmpty(extension) || extension.length != 17) {
            return brokerMessage.getByteBody();
        }
        byte b = extension[0];
        KafkaCompressionCodec valueOf = KafkaCompressionCodec.valueOf(((short) KafkaBufferUtils.readUnsignedLongLE(extension, 9)) & 7);
        return valueOf.equals(KafkaCompressionCodec.NoCompressionCodec) ? brokerMessage.getByteBody() : decompress(valueOf, ByteBuffer.wrap(brokerMessage.getByteBody()), b);
    }

    /* JADX WARN: Finally extract failed */
    protected byte[] decompress(KafkaCompressionCodec kafkaCompressionCodec, ByteBuffer byteBuffer, byte b) {
        try {
            byte[] bArr = new byte[1024];
            InputStream apply = KafkaCompressionCodecFactory.apply(kafkaCompressionCodec, new ByteBufferInputStream(byteBuffer), b);
            ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
            while (true) {
                try {
                    int read = apply.read(bArr);
                    if (read <= 0) {
                        apply.close();
                        return byteArrayOutputStream.toByteArray();
                    }
                    byteArrayOutputStream.write(bArr, 0, read);
                } catch (Throwable th) {
                    apply.close();
                    throw th;
                }
            }
        } catch (Exception e) {
            this.logger.error("decompress exception, kafkaCompressionCodec: {}, messageMagic: {}", new Object[]{kafkaCompressionCodec, Byte.valueOf(b), e});
            throw new ClientException(e);
        }
    }

    protected BrokerMessage doConvertBatch(BrokerMessage brokerMessage, ByteBuffer byteBuffer, int i) {
        BrokerMessage brokerMessage2 = new BrokerMessage();
        brokerMessage2.setSize(KafkaBufferUtils.readVarint(byteBuffer));
        byteBuffer.get();
        KafkaBufferUtils.readVarlong(byteBuffer);
        KafkaBufferUtils.readVarint(byteBuffer);
        brokerMessage2.setMsgIndexNo(i + brokerMessage.getMsgIndexNo());
        byte[] readVarBytes = KafkaBufferUtils.readVarBytes(byteBuffer);
        brokerMessage2.setTopic(brokerMessage.getTopic());
        brokerMessage2.setBusinessId(ArrayUtils.isEmpty(readVarBytes) ? null : new String(readVarBytes, Charsets.UTF_8));
        brokerMessage2.setBody(KafkaBufferUtils.readVarBytes(byteBuffer));
        brokerMessage2.setApp(brokerMessage.getApp());
        brokerMessage2.setPartition(brokerMessage.getPartition());
        brokerMessage2.setAttributes(brokerMessage.getAttributes());
        brokerMessage2.setStartTime(brokerMessage.getStartTime());
        brokerMessage2.setFlag(brokerMessage.getFlag());
        brokerMessage2.setSource(SourceType.KAFKA.getValue());
        brokerMessage2.setClientIp(brokerMessage.getClientIp());
        brokerMessage2.setPriority(brokerMessage.getPriority());
        brokerMessage2.setOrdered(brokerMessage.isOrdered());
        brokerMessage2.setStartTime(brokerMessage.getStartTime());
        brokerMessage2.setStoreTime(brokerMessage.getStoreTime());
        brokerMessage2.setCompressed(false);
        brokerMessage2.setBatch(true);
        int readVarint = KafkaBufferUtils.readVarint(byteBuffer);
        if (readVarint > 0) {
            HashMap newHashMap = Maps.newHashMap();
            for (int i2 = 0; i2 < readVarint; i2++) {
                newHashMap.put(new String(KafkaBufferUtils.readVarBytes(byteBuffer), Charsets.UTF_8), new String(KafkaBufferUtils.readVarBytes(byteBuffer), Charsets.UTF_8));
            }
            brokerMessage2.setAttributes(newHashMap);
        }
        return brokerMessage2;
    }

    /* JADX WARN: Can't rename method to resolve collision */
    @Override // org.joyqueue.shaded.com.jd.laf.extension.Type
    public Byte type() {
        return Byte.valueOf(SourceType.KAFKA.getValue());
    }
}
