package org.smartboot.mqtt.broker.eventbus.messagebus;

import java.util.ArrayList;
import java.util.List;
import java.util.function.Predicate;
import org.smartboot.mqtt.broker.BrokerContext;
import org.smartboot.mqtt.broker.eventbus.messagebus.consumer.Consumer;
import org.smartboot.mqtt.common.message.MqttPublishMessage;

/* loaded from: input_file:org/smartboot/mqtt/broker/eventbus/messagebus/MessageBusSubscriber.class */
public class MessageBusSubscriber implements MessageBus {
    private final List<Consumer> messageBuses = new ArrayList();

    @Override // org.smartboot.mqtt.broker.eventbus.messagebus.MessageBus
    public void consumer(Consumer consumer) {
        this.messageBuses.add(consumer);
    }

    @Override // org.smartboot.mqtt.broker.eventbus.messagebus.MessageBus
    public void consumer(Consumer consumer, Predicate<MqttPublishMessage> predicate) {
        consumer((brokerContext, mqttPublishMessage) -> {
            if (predicate.test(mqttPublishMessage)) {
                consumer.consume(brokerContext, mqttPublishMessage);
            }
        });
    }

    @Override // org.smartboot.mqtt.broker.eventbus.messagebus.MessageBus
    public void consume(BrokerContext brokerContext, MqttPublishMessage mqttPublishMessage) {
        this.messageBuses.forEach(consumer -> {
            consumer.consume(brokerContext, mqttPublishMessage);
        });
    }
}
