package org.iris_events.consumer;

import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConsumerShutdownSignalCallback;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.client.Recoverable;
import com.rabbitmq.client.RecoveryListener;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import jakarta.inject.Named;
import java.io.IOException;
import java.util.HashMap;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import org.iris_events.common.Exchanges;
import org.iris_events.consumer.QueueDeclarator;
import org.iris_events.exception.IrisConnectionException;
import org.iris_events.runtime.InstanceInfoProvider;
import org.iris_events.runtime.QueueNameProvider;
import org.iris_events.runtime.channel.ChannelService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ApplicationScoped
/* loaded from: input_file:org/iris_events/consumer/FrontendEventConsumer.class */
public class FrontendEventConsumer implements RecoveryListener {
    private static final Logger log = LoggerFactory.getLogger(FrontendEventConsumer.class);
    private static final int DEFAULT_MESSAGE_TTL = 15000;
    private final ChannelService channelService;
    private final InstanceInfoProvider instanceInfoProvider;
    private final QueueDeclarator queueDeclarator;
    private final String queueName;
    private final ConcurrentHashMap<String, DeliverCallback> deliverCallbackMap = new ConcurrentHashMap<>();
    private final ConcurrentHashMap<String, DeliverCallbackProvider> deliverCallbackProviderMap = new ConcurrentHashMap<>();
    private String channelId = UUID.randomUUID().toString();

    @Inject
    public FrontendEventConsumer(@Named("consumerChannelService") ChannelService channelService, InstanceInfoProvider instanceInfoProvider, QueueDeclarator queueDeclarator, QueueNameProvider queueNameProvider) {
        this.channelService = channelService;
        this.instanceInfoProvider = instanceInfoProvider;
        this.queueDeclarator = queueDeclarator;
        this.queueName = queueNameProvider.getFrontendQueueName();
    }

    public void addDeliverCallbackProvider(String str, DeliverCallbackProvider deliverCallbackProvider) {
        this.deliverCallbackProviderMap.put(str, deliverCallbackProvider);
    }

    public void initChannel() {
        try {
            Recoverable orCreateChannelById = this.channelService.getOrCreateChannelById(this.channelId);
            String str = this.queueName;
            HashMap hashMap = new HashMap();
            hashMap.put("x-message-ttl", Integer.valueOf(DEFAULT_MESSAGE_TTL));
            this.queueDeclarator.declareQueueWithRecreateOnConflict(orCreateChannelById, new QueueDeclarator.QueueDeclarationDetails(str, true, false, false, hashMap));
            setupDeliverCallbacks(orCreateChannelById);
            orCreateChannelById.basicConsume(str, false, getDeliverCallback(), getCancelCallback(), getShutdownCallback());
            if (orCreateChannelById instanceof Recoverable) {
                orCreateChannelById.addRecoveryListener(this);
            }
        } catch (IOException e) {
            log.error("Could not initialize frontend consumer", e);
            throw new IrisConnectionException("Could not initialize frontend consumer", e);
        }
    }

    private void setupDeliverCallbacks(Channel channel) {
        this.deliverCallbackProviderMap.forEach((str, deliverCallbackProvider) -> {
            try {
                channel.queueBind(this.queueName, Exchanges.FRONTEND.getValue(), str);
                this.deliverCallbackMap.put(str, deliverCallbackProvider.createDeliverCallback(channel));
            } catch (IOException e) {
                String format = String.format("Could not setup deliver callback for routing key = %s", str);
                log.error(format);
                throw new IrisConnectionException(format, e);
            }
        });
    }

    private DeliverCallback getDeliverCallback() {
        return (str, delivery) -> {
            String routingKey = delivery.getEnvelope().getRoutingKey();
            DeliverCallback deliverCallback = this.deliverCallbackMap.get(routingKey);
            if (deliverCallback != null) {
                deliverCallback.handle(str, delivery);
            } else {
                log.warn(String.format("No handler registered for frontend message with routingKey = %s, NACK-ing message", routingKey));
                this.channelService.getOrCreateChannelById(this.channelId).basicNack(delivery.getEnvelope().getDeliveryTag(), false, false);
            }
        };
    }

    private CancelCallback getCancelCallback() {
        return str -> {
            log.warn("Channel canceled for {}", this.instanceInfoProvider.getApplicationName() + " frontend queue");
        };
    }

    private ConsumerShutdownSignalCallback getShutdownCallback() {
        return (str, shutdownSignalException) -> {
            log.warn("Channel shut down for with signal:{}, queue: {}, consumer: {}", new Object[]{shutdownSignalException, this.queueName, str});
            try {
                this.channelService.removeChannel(this.channelId);
                this.channelId = UUID.randomUUID().toString();
                initChannel();
            } catch (IOException e) {
                log.error(String.format("Could not re-initialize channel for queue %s", this.queueName), e);
            }
        };
    }

    public void handleRecovery(Recoverable recoverable) {
        log.info("handleRecovery called for frontend consumer for queue {}", this.queueName);
        initChannel();
    }

    public void handleRecoveryStarted(Recoverable recoverable) {
        log.info("handleRecoveryStarted called for frontend consumer for queue {}", this.queueName);
    }
}
