package org.smartboot.mqtt.broker.eventbus.messagebus;

import java.util.ArrayList;
import java.util.List;
import java.util.function.Predicate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.smartboot.mqtt.broker.MqttSession;
import org.smartboot.mqtt.broker.eventbus.messagebus.consumer.Consumer;
import org.smartboot.mqtt.common.message.MqttPublishMessage;

/* loaded from: input_file:org/smartboot/mqtt/broker/eventbus/messagebus/MessageBus.class */
public class MessageBus {
    private static final Logger LOGGER = LoggerFactory.getLogger((Class<?>) MessageBus.class);
    private final List<Consumer> messageBuses = new ArrayList();

    public void consumer(Consumer consumer) {
        this.messageBuses.add(consumer);
    }

    public void consumer(Consumer consumer, Predicate<Message> predicate) {
        consumer((mqttSession, message) -> {
            if (predicate.test(message)) {
                consumer.consume(mqttSession, message);
            }
        });
    }

    public void publish(MqttSession mqttSession, MqttPublishMessage mqttPublishMessage) {
        Message message = new Message(mqttPublishMessage);
        boolean z = false;
        for (Consumer consumer : this.messageBuses) {
            try {
                if (consumer.enable()) {
                    consumer.consume(mqttSession, message);
                } else {
                    z = true;
                }
            } catch (Throwable th) {
                LOGGER.info("messageBus conumse exception", th);
            }
        }
        if (z) {
            this.messageBuses.removeIf(consumer2 -> {
                return !consumer2.enable();
            });
        }
    }
}
