package org.axonframework.extensions.amqp.eventhandling.spring;

import com.rabbitmq.client.Channel;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.function.Consumer;
import org.axonframework.common.Registration;
import org.axonframework.eventhandling.EventMessage;
import org.axonframework.extensions.amqp.eventhandling.AMQPMessageConverter;
import org.axonframework.extensions.amqp.eventhandling.DefaultAMQPMessageConverter;
import org.axonframework.messaging.SubscribableMessageSource;
import org.axonframework.serialization.Serializer;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;

/* loaded from: input_file:org/axonframework/extensions/amqp/eventhandling/spring/SpringAMQPMessageSource.class */
public class SpringAMQPMessageSource implements ChannelAwareMessageListener, SubscribableMessageSource<EventMessage<?>> {
    private final List<Consumer<List<? extends EventMessage<?>>>> eventProcessors;
    private final AMQPMessageConverter messageConverter;

    public SpringAMQPMessageSource(Serializer serializer) {
        this(DefaultAMQPMessageConverter.builder().serializer(serializer).build());
    }

    public SpringAMQPMessageSource(AMQPMessageConverter aMQPMessageConverter) {
        this.eventProcessors = new CopyOnWriteArrayList();
        this.messageConverter = aMQPMessageConverter;
    }

    public Registration subscribe(Consumer<List<? extends EventMessage<?>>> consumer) {
        this.eventProcessors.add(consumer);
        return () -> {
            return this.eventProcessors.remove(consumer);
        };
    }

    public void onMessage(Message message, Channel channel) {
        if (this.eventProcessors.isEmpty()) {
            return;
        }
        this.messageConverter.readAMQPMessage(message.getBody(), message.getMessageProperties().getHeaders()).ifPresent(eventMessage -> {
            this.eventProcessors.forEach(consumer -> {
                consumer.accept(Collections.singletonList(eventMessage));
            });
        });
    }
}
