package org.joyqueue.broker.mqtt.util;

import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.handler.codec.mqtt.MqttPublishMessage;
import io.netty.handler.codec.mqtt.MqttQoS;
import java.util.zip.CRC32;
import org.joyqueue.message.BrokerMessage;
import org.joyqueue.message.SourceType;
import org.joyqueue.toolkit.network.IpUtil;
import org.joyqueue.toolkit.time.SystemClock;

/* loaded from: input_file:org/joyqueue/broker/mqtt/util/MqttMessageSerializer.class */
public class MqttMessageSerializer {
    private static final int EXTENSION_QOS_LENGTH = 4;

    public static BrokerMessage convertToBrokerMsg(Channel channel, MqttPublishMessage mqttPublishMessage) {
        BrokerMessage brokerMessage = new BrokerMessage();
        long now = SystemClock.now();
        brokerMessage.setApp(NettyAttrManager.getAttrClientId(channel));
        brokerMessage.setTopic(mqttPublishMessage.variableHeader().topicName());
        brokerMessage.setCompressed(false);
        brokerMessage.setClientIp(IpUtil.toAddress(channel.remoteAddress()).getBytes());
        brokerMessage.setStartTime(now);
        brokerMessage.setSource(SourceType.MQTT.getValue());
        brokerMessage.setBusinessId(Integer.toString(mqttPublishMessage.variableHeader().packetId()));
        ByteBuf payload = mqttPublishMessage.payload();
        byte[] bArr = new byte[payload.readableBytes()];
        int readerIndex = payload.readerIndex();
        payload.readBytes(bArr);
        payload.readerIndex(readerIndex);
        brokerMessage.setBody(bArr);
        writeExtension(mqttPublishMessage.fixedHeader().qosLevel(), brokerMessage);
        CRC32 crc32 = new CRC32();
        crc32.update(brokerMessage.getBody().slice());
        brokerMessage.setBodyCRC(crc32.getValue());
        return brokerMessage;
    }

    public static void writeExtension(MqttQoS mqttQoS, BrokerMessage brokerMessage) {
        brokerMessage.setExtension(new byte[]{(byte) ((mqttQoS.value() >> 24) & 255), (byte) ((mqttQoS.value() >> 16) & 255), (byte) ((mqttQoS.value() >> 8) & 255), (byte) (mqttQoS.value() & 255)});
    }

    public static int readExtension(BrokerMessage brokerMessage) {
        byte[] extension = brokerMessage.getExtension();
        if (extension == null || extension.length != EXTENSION_QOS_LENGTH) {
            return 0;
        }
        return (extension[3] & 255) | ((extension[2] & 255) << 8) | ((extension[1] & 255) << 16) | ((extension[0] & 255) << 24);
    }

    public static int getLowerQos(int i, int i2) {
        return i < i2 ? i : i2;
    }
}
