package org.smartboot.mqtt.common;

import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.smartboot.mqtt.common.enums.MqttQoS;
import org.smartboot.mqtt.common.message.MqttMessage;
import org.smartboot.mqtt.common.message.MqttPubAckMessage;
import org.smartboot.mqtt.common.message.MqttPubCompMessage;
import org.smartboot.mqtt.common.message.MqttPubRecMessage;
import org.smartboot.mqtt.common.message.MqttPubRelMessage;
import org.smartboot.mqtt.common.message.MqttPublishMessage;
import org.smartboot.mqtt.common.util.ValidateUtils;

/* loaded from: input_file:org/smartboot/mqtt/common/QosPublisher.class */
public abstract class QosPublisher {
    private static final Logger LOGGER = LoggerFactory.getLogger(QosPublisher.class);

    /* JADX INFO: Access modifiers changed from: package-private */
    public void publishQos1(AbstractSession abstractSession, MqttPublishMessage mqttPublishMessage, Consumer<Integer> consumer, boolean z) {
        ValidateUtils.notNull(Boolean.valueOf(mqttPublishMessage.getFixedHeader().getQosLevel() == MqttQoS.AT_LEAST_ONCE), "qos is null");
        CompletableFuture<Boolean> completableFuture = new CompletableFuture<>();
        Integer valueOf = Integer.valueOf(mqttPublishMessage.getVariableHeader().getPacketId());
        abstractSession.responseConsumers.put(valueOf, new AckMessage(mqttPublishMessage, mqttPacketIdentifierMessage -> {
            ValidateUtils.isTrue(mqttPacketIdentifierMessage instanceof MqttPubAckMessage, "invalid message type");
            completableFuture.complete(true);
            abstractSession.responseConsumers.remove(valueOf);
            LOGGER.info("Qos1消息发送成功...");
            consumer.accept(Integer.valueOf(mqttPublishMessage.getVariableHeader().getPacketId()));
        }));
        abstractSession.write(mqttPublishMessage, z);
        retry(completableFuture, abstractSession, mqttPublishMessage);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void publishQos2(AbstractSession abstractSession, MqttPublishMessage mqttPublishMessage, Consumer<Integer> consumer, boolean z) {
        ValidateUtils.notNull(Boolean.valueOf(mqttPublishMessage.getFixedHeader().getQosLevel() == MqttQoS.EXACTLY_ONCE), "qos is null");
        Integer valueOf = Integer.valueOf(mqttPublishMessage.getVariableHeader().getPacketId());
        CompletableFuture<Boolean> completableFuture = new CompletableFuture<>();
        abstractSession.responseConsumers.put(valueOf, new AckMessage(mqttPublishMessage, mqttPacketIdentifierMessage -> {
            ValidateUtils.isTrue(mqttPacketIdentifierMessage instanceof MqttPubRecMessage, "invalid message type");
            ValidateUtils.isTrue(Objects.equals(Integer.valueOf(mqttPacketIdentifierMessage.getVariableHeader().getPacketId()), Integer.valueOf(mqttPublishMessage.getVariableHeader().getPacketId())), "invalid packetId");
            completableFuture.complete(true);
            MqttPubRelMessage mqttPubRelMessage = new MqttPubRelMessage(mqttPacketIdentifierMessage.getVariableHeader().getPacketId());
            CompletableFuture<Boolean> completableFuture2 = new CompletableFuture<>();
            abstractSession.responseConsumers.put(valueOf, new AckMessage(mqttPubRelMessage, mqttPacketIdentifierMessage -> {
                ValidateUtils.isTrue(mqttPacketIdentifierMessage instanceof MqttPubCompMessage, "invalid message type");
                ValidateUtils.isTrue(Objects.equals(Integer.valueOf(mqttPacketIdentifierMessage.getVariableHeader().getPacketId()), Integer.valueOf(mqttPubRelMessage.getVariableHeader().getPacketId())), "invalid packetId");
                completableFuture2.complete(true);
                LOGGER.info("Qos2消息发送成功...");
                consumer.accept(Integer.valueOf(mqttPacketIdentifierMessage.getVariableHeader().getPacketId()));
            }));
            retry(completableFuture2, abstractSession, mqttPubRelMessage);
            abstractSession.write(mqttPubRelMessage);
        }));
        abstractSession.write((MqttMessage) mqttPublishMessage, false);
        retry(completableFuture, abstractSession, mqttPublishMessage);
    }

    protected abstract void retry(CompletableFuture<Boolean> completableFuture, AbstractSession abstractSession, MqttMessage mqttMessage);
}
