package org.smartboot.mqtt.broker.eventbus.messagebus;

import java.util.ArrayList;
import java.util.List;
import java.util.function.Consumer;
import java.util.function.Predicate;
import org.smartboot.mqtt.broker.BrokerContext;
import org.smartboot.mqtt.broker.BrokerTopic;
import org.smartboot.mqtt.broker.Message;
import org.smartboot.mqtt.broker.eventbus.EventObject;
import org.smartboot.mqtt.common.eventbus.EventType;
import org.smartboot.mqtt.common.message.MqttPublishMessage;

/* loaded from: input_file:org/smartboot/mqtt/broker/eventbus/messagebus/MessageBusSubscriber.class */
public class MessageBusSubscriber implements MessageBus {
    private final BrokerContext brokerContext;
    private final List<Consumer<Message>> messageBuses = new ArrayList();

    public MessageBusSubscriber(BrokerContext brokerContext) {
        this.brokerContext = brokerContext;
    }

    public void subscribe(EventType<EventObject<MqttPublishMessage>> eventType, EventObject<MqttPublishMessage> eventObject) {
        BrokerTopic orCreateTopic = this.brokerContext.getOrCreateTopic(eventObject.getObject().getVariableHeader().getTopicName());
        Message message = new Message(eventObject.getObject());
        try {
            this.brokerContext.getProviders().getPersistenceProvider().doSave(message);
            this.brokerContext.getMessageBus().producer(message);
            this.brokerContext.batchPublish(orCreateTopic);
        } catch (Throwable th) {
            this.brokerContext.batchPublish(orCreateTopic);
            throw th;
        }
    }

    @Override // org.smartboot.mqtt.broker.eventbus.messagebus.MessageBus
    public void consumer(Consumer<Message> consumer) {
        consumer(consumer, message -> {
            return true;
        });
    }

    @Override // org.smartboot.mqtt.broker.eventbus.messagebus.MessageBus
    public void consumer(Consumer<Message> consumer, Predicate<Message> predicate) {
        this.messageBuses.add(message -> {
            if (predicate.test(message)) {
                consumer.accept(message);
            }
        });
    }

    @Override // org.smartboot.mqtt.broker.eventbus.messagebus.MessageBus
    public void producer(Message message) {
        this.messageBuses.forEach(consumer -> {
            consumer.accept(message);
        });
    }

    public /* bridge */ /* synthetic */ void subscribe(EventType eventType, Object obj) {
        subscribe((EventType<EventObject<MqttPublishMessage>>) eventType, (EventObject<MqttPublishMessage>) obj);
    }
}
