package org.smartboot.mqtt.broker.topic;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.smartboot.mqtt.broker.eventbus.messagebus.Message;
import org.smartboot.mqtt.common.AsyncTask;
import org.smartboot.mqtt.common.TopicToken;

/* loaded from: input_file:org/smartboot/mqtt/broker/topic/BrokerTopic.class */
public class BrokerTopic {
    private final SubscriberGroup defaultGroup;
    private final Map<String, SubscriberGroup> shareSubscribers;
    private final Semaphore semaphore;
    private final TopicToken topicToken;
    private final ExecutorService executorService;
    private boolean enabled;
    private final AtomicInteger version;
    private final AsyncTask asyncTask;
    private Message retainMessage;
    private final MessageQueue messageQueue;
    private final ConcurrentLinkedQueue<AbstractConsumerRecord> queue;
    private static final Logger LOGGER = LoggerFactory.getLogger((Class<?>) BrokerTopic.class);
    private static final AbstractConsumerRecord BREAK = new AbstractConsumerRecord(null, null, -1) { // from class: org.smartboot.mqtt.broker.topic.BrokerTopic.2
        @Override // org.smartboot.mqtt.broker.topic.AbstractConsumerRecord
        public void pushToClient() {
            throw new UnsupportedOperationException();
        }
    };

    public BrokerTopic(String str) {
        this(str, null);
    }

    public BrokerTopic(String str, ExecutorService executorService) {
        this.defaultGroup = new SubscriberGroup();
        this.shareSubscribers = new ConcurrentHashMap();
        this.semaphore = new Semaphore(1);
        this.enabled = true;
        this.version = new AtomicInteger();
        this.asyncTask = new AsyncTask() { // from class: org.smartboot.mqtt.broker.topic.BrokerTopic.1
            @Override // org.smartboot.mqtt.common.AsyncTask
            public void execute() {
                BrokerTopic.this.queue.offer(BrokerTopic.BREAK);
                int i = BrokerTopic.this.version.get();
                while (true) {
                    AbstractConsumerRecord abstractConsumerRecord = (AbstractConsumerRecord) BrokerTopic.this.queue.poll();
                    if (abstractConsumerRecord == BrokerTopic.BREAK) {
                        break;
                    }
                    try {
                        abstractConsumerRecord.pushToClient();
                    } catch (Exception e) {
                        BrokerTopic.LOGGER.error("batch publish exception:{}", e.getMessage(), e);
                    }
                }
                BrokerTopic.this.semaphore.release();
                if (i == BrokerTopic.this.version.get() || BrokerTopic.this.queue.isEmpty()) {
                    return;
                }
                BrokerTopic.this.push();
            }
        };
        this.messageQueue = new MemoryMessageStoreQueue();
        this.queue = new ConcurrentLinkedQueue<>();
        this.topicToken = new TopicToken(str);
        this.executorService = executorService;
    }

    public SubscriberGroup getSubscriberGroup(TopicToken topicToken) {
        return topicToken.isShared() ? this.shareSubscribers.computeIfAbsent(topicToken.getTopicFilter(), str -> {
            return new SubscriberSharedGroup(topicToken, this);
        }) : this.defaultGroup;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void removeShareGroup(String str) {
        this.shareSubscribers.remove(str);
    }

    public void addSubscriber(AbstractConsumerRecord abstractConsumerRecord) {
        this.queue.offer(abstractConsumerRecord);
    }

    public TopicToken getTopicToken() {
        return this.topicToken;
    }

    public String getTopic() {
        return this.topicToken.getTopicFilter();
    }

    public Message getRetainMessage() {
        return this.retainMessage;
    }

    public void setRetainMessage(Message message) {
        this.retainMessage = message;
    }

    public MessageQueue getMessageQueue() {
        return this.messageQueue;
    }

    public String toString() {
        return getTopic();
    }

    public void push() {
        if (this.enabled && this.semaphore.tryAcquire()) {
            this.executorService.execute(this.asyncTask);
        }
    }

    public AtomicInteger getVersion() {
        return this.version;
    }

    public void disable() {
        this.enabled = false;
    }
}
