package org.smartboot.mqtt.broker;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.smartboot.mqtt.broker.provider.PersistenceProvider;
import org.smartboot.mqtt.broker.provider.impl.message.PersistenceMessage;
import org.smartboot.mqtt.common.InflightQueue;
import org.smartboot.mqtt.common.MqttMessageBuilders;
import org.smartboot.mqtt.common.TopicToken;
import org.smartboot.mqtt.common.enums.MqttQoS;
import org.smartboot.mqtt.common.enums.MqttVersion;
import org.smartboot.mqtt.common.eventbus.EventType;
import org.smartboot.mqtt.common.message.MqttPublishMessage;
import org.smartboot.mqtt.common.message.variable.properties.PublishProperties;

/* loaded from: input_file:org/smartboot/mqtt/broker/TopicSubscriber.class */
public class TopicSubscriber {
    private static final Logger LOGGER = LoggerFactory.getLogger(TopicSubscriber.class);
    private final MqttSession mqttSession;
    private final BrokerTopic topic;
    private final MqttQoS mqttQoS;
    private long nextConsumerOffset;
    private long retainConsumerOffset;
    private TopicToken topicFilterToken;
    private final long latestSubscribeTime = System.currentTimeMillis();
    private int pushVersion = -1;
    private boolean ready = false;

    public TopicSubscriber(BrokerTopic brokerTopic, MqttSession mqttSession, MqttQoS mqttQoS, long j, long j2) {
        this.topic = brokerTopic;
        this.mqttSession = mqttSession;
        this.mqttQoS = mqttQoS;
        this.nextConsumerOffset = j;
        this.retainConsumerOffset = j2;
    }

    public void batchPublish(BrokerContext brokerContext) {
        this.nextConsumerOffset = publish0(brokerContext, 0, this.nextConsumerOffset);
    }

    private long publish0(BrokerContext brokerContext, int i, long j) {
        PersistenceProvider persistenceProvider = brokerContext.getProviders().getPersistenceProvider();
        int i2 = this.topic.getVersion().get();
        PersistenceMessage persistenceMessage = persistenceProvider.get(this.topic.getTopic(), j);
        if (persistenceMessage == null) {
            this.pushVersion = i2;
            this.mqttSession.flush();
            return j;
        }
        if (i > 16) {
            this.mqttSession.flush();
            return j;
        }
        MqttMessageBuilders.PublishBuilder publishBuilder = MqttMessageBuilders.publish().payload(persistenceMessage.getPayload()).qos(this.mqttQoS).topicName(persistenceMessage.getTopic());
        if (this.mqttQoS == MqttQoS.AT_LEAST_ONCE || this.mqttQoS == MqttQoS.EXACTLY_ONCE) {
            publishBuilder.packetId(this.mqttSession.newPacketId());
        }
        MqttPublishMessage build = publishBuilder.build();
        if (this.mqttSession.getMqttVersion() == MqttVersion.MQTT_5) {
            build.getVariableHeader().setProperties(new PublishProperties());
        }
        InflightQueue inflightQueue = this.mqttSession.getInflightQueue();
        int offer = inflightQueue.offer(build, persistenceMessage.getOffset());
        if (offer == -1) {
            this.mqttSession.flush();
            return j;
        }
        long currentTimeMillis = System.currentTimeMillis();
        this.mqttSession.publish(build, num -> {
            long commit = inflightQueue.commit(offer);
            if (commit == -1) {
                return;
            }
            commitNextConsumerOffset(commit + 1);
            if (persistenceMessage.isRetained()) {
                setRetainConsumerOffset(getRetainConsumerOffset() + 1);
            }
            commitRetainConsumerTimestamp(persistenceMessage.getCreateTime());
        }, false);
        long currentTimeMillis2 = System.currentTimeMillis() - currentTimeMillis;
        if (currentTimeMillis2 > 100) {
            System.out.println("publish busy ,cost: " + currentTimeMillis2);
        }
        brokerContext.getEventBus().publish(EventType.PUSH_PUBLISH_MESSAGE, this.mqttSession);
        return publish0(brokerContext, i + 1, j + 1);
    }

    public BrokerTopic getTopic() {
        return this.topic;
    }

    public MqttSession getMqttSession() {
        return this.mqttSession;
    }

    public MqttQoS getMqttQoS() {
        return this.mqttQoS;
    }

    public long getNextConsumerOffset() {
        return this.nextConsumerOffset;
    }

    public void commitRetainConsumerTimestamp(long j) {
    }

    public void commitNextConsumerOffset(long j) {
    }

    public long getRetainConsumerOffset() {
        return this.retainConsumerOffset;
    }

    public void setRetainConsumerOffset(long j) {
        this.retainConsumerOffset = j;
    }

    public long getLatestSubscribeTime() {
        return this.latestSubscribeTime;
    }

    public TopicToken getTopicFilterToken() {
        return this.topicFilterToken;
    }

    public void setTopicFilterToken(TopicToken topicToken) {
        this.topicFilterToken = topicToken;
    }

    public boolean isReady() {
        return this.ready;
    }

    public void setReady(boolean z) {
        this.ready = z;
    }

    public int getPushVersion() {
        return this.pushVersion;
    }
}
