package org.smartboot.mqtt.common;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.smartboot.mqtt.common.enums.MqttMessageType;
import org.smartboot.mqtt.common.enums.MqttVersion;
import org.smartboot.mqtt.common.exception.MqttException;
import org.smartboot.mqtt.common.message.MqttFixedHeader;
import org.smartboot.mqtt.common.message.MqttMessage;
import org.smartboot.mqtt.common.message.MqttPacketIdentifierMessage;
import org.smartboot.mqtt.common.message.MqttPubRelMessage;
import org.smartboot.mqtt.common.message.MqttPublishMessage;
import org.smartboot.mqtt.common.message.MqttSubscribeMessage;
import org.smartboot.mqtt.common.message.variable.MqttPacketIdVariableHeader;
import org.smartboot.mqtt.common.message.variable.MqttPubQosVariableHeader;
import org.smartboot.mqtt.common.message.variable.MqttPublishVariableHeader;
import org.smartboot.mqtt.common.message.variable.MqttSubscribeVariableHeader;
import org.smartboot.mqtt.common.message.variable.properties.ReasonProperties;
import org.smartboot.mqtt.common.util.MqttAttachKey;
import org.smartboot.mqtt.common.util.MqttMessageBuilders;
import org.smartboot.mqtt.common.util.MqttPropertyConstant;
import org.smartboot.mqtt.common.util.ValidateUtils;
import org.smartboot.socket.util.AttachKey;
import org.smartboot.socket.util.Attachment;

/* loaded from: input_file:org/smartboot/mqtt/common/InflightQueue.class */
public class InflightQueue {
    private static final Logger LOGGER = LoggerFactory.getLogger(InflightQueue.class);
    static final AttachKey<Runnable> RETRY_TASK_ATTACH_KEY = AttachKey.valueOf(MqttAttachKey.RETRY_TASK);
    private static final int TIMEOUT = 30;
    private final InflightMessage[] queue;
    private int takeIndex;
    private int putIndex;
    private int count;
    private final AbstractSession session;
    private final AtomicInteger packetId = new AtomicInteger(0);
    private final ReentrantLock lock = new ReentrantLock(false);
    private final Condition notFull = this.lock.newCondition();

    /* renamed from: org.smartboot.mqtt.common.InflightQueue$2, reason: invalid class name */
    /* loaded from: input_file:org/smartboot/mqtt/common/InflightQueue$2.class */
    static /* synthetic */ class AnonymousClass2 {
        static final /* synthetic */ int[] $SwitchMap$org$smartboot$mqtt$common$enums$MqttMessageType = new int[MqttMessageType.values().length];

        static {
            try {
                $SwitchMap$org$smartboot$mqtt$common$enums$MqttMessageType[MqttMessageType.PUBACK.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$smartboot$mqtt$common$enums$MqttMessageType[MqttMessageType.PUBREC.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$smartboot$mqtt$common$enums$MqttMessageType[MqttMessageType.PUBCOMP.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$org$smartboot$mqtt$common$enums$MqttMessageType[MqttMessageType.SUBACK.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
            try {
                $SwitchMap$org$smartboot$mqtt$common$enums$MqttMessageType[MqttMessageType.UNSUBACK.ordinal()] = 5;
            } catch (NoSuchFieldError e5) {
            }
        }
    }

    public InflightQueue(AbstractSession abstractSession, int i) {
        ValidateUtils.isTrue(i > 0, "inflight must >0");
        this.queue = new InflightMessage[i];
        this.session = abstractSession;
    }

    public CompletableFuture<MqttPacketIdentifierMessage<? extends MqttPacketIdVariableHeader>> put(MqttMessageBuilders.MessageBuilder messageBuilder) {
        ReentrantLock reentrantLock = this.lock;
        try {
            try {
                reentrantLock.lockInterruptibly();
                while (this.count == this.queue.length) {
                    this.notFull.await();
                }
                CompletableFuture<MqttPacketIdentifierMessage<? extends MqttPacketIdVariableHeader>> enqueue = enqueue(messageBuilder);
                reentrantLock.unlock();
                return enqueue;
            } catch (Exception e) {
                throw new MqttException("put message into inflight queue exception", e);
            }
        } catch (Throwable th) {
            reentrantLock.unlock();
            throw th;
        }
    }

    public CompletableFuture<MqttPacketIdentifierMessage<? extends MqttPacketIdVariableHeader>> offer(MqttMessageBuilders.MessageBuilder messageBuilder) {
        return offer(messageBuilder, mqttPacketIdentifierMessage -> {
        });
    }

    public CompletableFuture<MqttPacketIdentifierMessage<? extends MqttPacketIdVariableHeader>> offer(MqttMessageBuilders.MessageBuilder messageBuilder, Consumer<MqttPacketIdentifierMessage<? extends MqttPacketIdVariableHeader>> consumer) {
        ReentrantLock reentrantLock = this.lock;
        reentrantLock.lock();
        try {
            if (this.count != this.queue.length) {
                CompletableFuture<MqttPacketIdentifierMessage<? extends MqttPacketIdVariableHeader>> enqueue = enqueue(messageBuilder);
                reentrantLock.unlock();
                return enqueue;
            }
            int i = this.putIndex - 1;
            if (i < 0) {
                i = this.queue.length - 1;
            }
            this.queue[i].getFuture().thenAccept((Consumer<? super MqttPacketIdentifierMessage<? extends MqttPacketIdVariableHeader>>) consumer);
            reentrantLock.unlock();
            return null;
        } catch (Throwable th) {
            reentrantLock.unlock();
            throw th;
        }
    }

    public CompletableFuture<MqttPacketIdentifierMessage<? extends MqttPacketIdVariableHeader>> enqueue(MqttMessageBuilders.MessageBuilder messageBuilder) {
        int incrementAndGet = this.packetId.incrementAndGet();
        if (incrementAndGet > 65535) {
            incrementAndGet = (incrementAndGet % this.queue.length) + this.queue.length;
            this.packetId.set(incrementAndGet);
        }
        InflightMessage inflightMessage = new InflightMessage(incrementAndGet, messageBuilder.packetId(incrementAndGet).build());
        InflightMessage[] inflightMessageArr = this.queue;
        int i = this.putIndex;
        this.putIndex = i + 1;
        inflightMessageArr[i] = inflightMessage;
        if (this.putIndex == this.queue.length) {
            this.putIndex = 0;
        }
        this.count++;
        if (this.count == 1) {
            retry(inflightMessage);
        }
        this.session.write(inflightMessage.getOriginalMessage(), this.count == this.queue.length);
        return inflightMessage.getFuture();
    }

    private void retry(final InflightMessage inflightMessage) {
        if (inflightMessage.isCommit() || this.session.isDisconnect()) {
            return;
        }
        this.session.getTimer().schedule(new AsyncTask() { // from class: org.smartboot.mqtt.common.InflightQueue.1
            /* JADX WARN: Multi-variable type inference failed */
            @Override // org.smartboot.mqtt.common.AsyncTask
            public void execute() {
                if (inflightMessage.isCommit()) {
                    return;
                }
                if (InflightQueue.this.session.session.isInvalid()) {
                    InflightQueue.LOGGER.debug("session is disconnect , pause qos monitor.");
                    return;
                }
                long millis = (TimeUnit.SECONDS.toMillis(30L) - System.currentTimeMillis()) + inflightMessage.getLatestTime();
                if (millis > 0) {
                    InflightQueue.LOGGER.info("the time is not up, try again in {} milliseconds ", Long.valueOf(millis));
                    InflightQueue.this.session.getTimer().schedule(this, millis, TimeUnit.MILLISECONDS);
                    return;
                }
                inflightMessage.setLatestTime(System.currentTimeMillis());
                InflightQueue.LOGGER.info("message:{} time out,retry...", inflightMessage.getExpectMessageType());
                switch (AnonymousClass2.$SwitchMap$org$smartboot$mqtt$common$enums$MqttMessageType[inflightMessage.getExpectMessageType().ordinal()]) {
                    case 1:
                    case MqttPropertyConstant.MESSAGE_EXPIRY_INTERVAL /* 2 */:
                        MqttPublishMessage mqttPublishMessage = (MqttPublishMessage) inflightMessage.getOriginalMessage();
                        InflightQueue.this.session.write(new MqttPublishMessage(new MqttFixedHeader(mqttPublishMessage.getFixedHeader().getMessageType(), true, mqttPublishMessage.getFixedHeader().getQosLevel(), mqttPublishMessage.getFixedHeader().isRetain()), (MqttPublishVariableHeader) mqttPublishMessage.getVariableHeader(), mqttPublishMessage.getPayload().getPayload()));
                        break;
                    case MqttPropertyConstant.CONTENT_TYPE /* 3 */:
                        ReasonProperties reasonProperties = null;
                        if (inflightMessage.getOriginalMessage().getVersion() == MqttVersion.MQTT_5) {
                            reasonProperties = new ReasonProperties();
                        }
                        InflightQueue.this.session.write(new MqttPubRelMessage(MqttFixedHeader.PUB_REL_HEADER_DUP, new MqttPubQosVariableHeader(((MqttPacketIdVariableHeader) inflightMessage.getOriginalMessage().getVariableHeader()).getPacketId(), reasonProperties)));
                        break;
                    case 4:
                        MqttSubscribeMessage mqttSubscribeMessage = (MqttSubscribeMessage) inflightMessage.getOriginalMessage();
                        InflightQueue.this.session.write(new MqttSubscribeMessage(MqttFixedHeader.SUBSCRIBE_HEADER_DUP, (MqttSubscribeVariableHeader) mqttSubscribeMessage.getVariableHeader(), mqttSubscribeMessage.getPayload()));
                        break;
                    default:
                        throw new UnsupportedOperationException("invalid message type: " + inflightMessage.getExpectMessageType());
                }
                inflightMessage.setRetryCount(inflightMessage.getRetryCount() + 1);
                InflightQueue.this.session.getTimer().schedule(this, 30L, TimeUnit.SECONDS);
            }
        }, TimeUnit.SECONDS.toMillis(30L) - (System.currentTimeMillis() - inflightMessage.getLatestTime()), TimeUnit.MILLISECONDS);
    }

    /* JADX WARN: Multi-variable type inference failed */
    public void notify(MqttPacketIdentifierMessage<? extends MqttPacketIdVariableHeader> mqttPacketIdentifierMessage) {
        InflightMessage inflightMessage = this.queue[(((MqttPacketIdVariableHeader) mqttPacketIdentifierMessage.getVariableHeader()).getPacketId() - 1) % this.queue.length];
        if (inflightMessage == null) {
            LOGGER.info("ignore duplicate message");
            return;
        }
        switch (AnonymousClass2.$SwitchMap$org$smartboot$mqtt$common$enums$MqttMessageType[mqttPacketIdentifierMessage.getFixedHeader().getMessageType().ordinal()]) {
            case 1:
            case MqttPropertyConstant.CONTENT_TYPE /* 3 */:
            case 4:
            case 5:
                if (mqttPacketIdentifierMessage.getFixedHeader().getMessageType() == inflightMessage.getExpectMessageType() && ((MqttPacketIdVariableHeader) mqttPacketIdentifierMessage.getVariableHeader()).getPacketId() == inflightMessage.getAssignedPacketId()) {
                    inflightMessage.setResponseMessage(mqttPacketIdentifierMessage);
                    inflightMessage.setLatestTime(System.currentTimeMillis());
                    ReentrantLock reentrantLock = this.lock;
                    reentrantLock.lock();
                    try {
                        commit(inflightMessage);
                        reentrantLock.unlock();
                        return;
                    } catch (Throwable th) {
                        reentrantLock.unlock();
                        throw th;
                    }
                }
                return;
            case MqttPropertyConstant.MESSAGE_EXPIRY_INTERVAL /* 2 */:
                if (mqttPacketIdentifierMessage.getFixedHeader().getMessageType() == inflightMessage.getExpectMessageType() && ((MqttPacketIdVariableHeader) mqttPacketIdentifierMessage.getVariableHeader()).getPacketId() == inflightMessage.getAssignedPacketId()) {
                    inflightMessage.setResponseMessage(mqttPacketIdentifierMessage);
                    inflightMessage.setLatestTime(System.currentTimeMillis());
                    inflightMessage.setExpectMessageType(MqttMessageType.PUBCOMP);
                    ReasonProperties reasonProperties = null;
                    if (mqttPacketIdentifierMessage.getVersion() == MqttVersion.MQTT_5) {
                        reasonProperties = new ReasonProperties();
                    }
                    this.session.write((MqttMessage) new MqttPubRelMessage(MqttFixedHeader.PUB_REL_HEADER, new MqttPubQosVariableHeader(((MqttPacketIdVariableHeader) mqttPacketIdentifierMessage.getVariableHeader()).getPacketId(), reasonProperties)), false);
                    return;
                }
                return;
            default:
                throw new RuntimeException(mqttPacketIdentifierMessage.toString());
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    private void commit(InflightMessage inflightMessage) {
        MqttPacketIdentifierMessage<? extends MqttPacketIdVariableHeader> originalMessage = inflightMessage.getOriginalMessage();
        ValidateUtils.isTrue(originalMessage.getFixedHeader().getQosLevel().value() == 0 || ((MqttPacketIdVariableHeader) originalMessage.getVariableHeader()).getPacketId() == inflightMessage.getAssignedPacketId(), "invalid message");
        inflightMessage.setCommit(true);
        if ((inflightMessage.getAssignedPacketId() - 1) % this.queue.length != this.takeIndex) {
            return;
        }
        InflightMessage[] inflightMessageArr = this.queue;
        int i = this.takeIndex;
        this.takeIndex = i + 1;
        inflightMessageArr[i] = null;
        this.count--;
        if (this.takeIndex == this.queue.length) {
            this.takeIndex = 0;
        }
        inflightMessage.getFuture().complete(inflightMessage.getResponseMessage());
        while (this.count > 0 && this.queue[this.takeIndex].isCommit()) {
            InflightMessage inflightMessage2 = this.queue[this.takeIndex];
            inflightMessage2.getFuture().complete(inflightMessage2.getResponseMessage());
            InflightMessage[] inflightMessageArr2 = this.queue;
            int i2 = this.takeIndex;
            this.takeIndex = i2 + 1;
            inflightMessageArr2[i2] = null;
            if (this.takeIndex == this.queue.length) {
                this.takeIndex = 0;
            }
            this.count--;
        }
        if (this.count < this.queue.length) {
            this.notFull.signal();
        }
        if (this.count > 0) {
            Attachment attachment = (Attachment) this.session.session.getAttachment();
            InflightMessage inflightMessage3 = this.queue[this.takeIndex];
            attachment.put(RETRY_TASK_ATTACH_KEY, () -> {
                this.session.getInflightQueue().retry(inflightMessage3);
            });
        }
    }
}
