package org.kurento.rabbitmq;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import org.kurento.commons.Address;
import org.kurento.commons.PropertiesManager;
import org.kurento.jsonrpc.message.Request;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;

/* loaded from: input_file:org/kurento/rabbitmq/RabbitMqManager.class */
public class RabbitMqManager {
    public static final String RETRY_TIMEOUT_PROPERTY = "rabbit.retryTimeout";
    public static final String NUM_RETRIES_PROPERTY = "rabbit.numRetries";
    public static final String EVENT_QUEUE_PREFIX = "event_";
    public static final String CLIENT_QUEUE_PREFIX = "client_";
    public static final String CLIENT_REPLY_QUEUE_PREFIX = "client_reply_";
    public static final String MEDIA_PIPELINE_QUEUE_PREFIX = "media_pipeline_";
    public static final String PIPELINE_CREATION_QUEUE = "pipeline_creation";
    private static final Logger log = LoggerFactory.getLogger(RabbitMqManager.class);
    private final long retryTimeOut;
    private final long numRetries;
    private static final String EXPIRATION_TIME = "25000";
    private CachingConnectionFactory cf;
    private RabbitAdmin admin;
    private final List<SimpleMessageListenerContainer> containers;
    private final Address address;
    private String username;
    private String password;
    private String vhost;

    /* loaded from: input_file:org/kurento/rabbitmq/RabbitMqManager$BrokerMessageReceiver.class */
    public interface BrokerMessageReceiver {
        void onMessage(String str);
    }

    /* loaded from: input_file:org/kurento/rabbitmq/RabbitMqManager$BrokerMessageReceiverWithResponse.class */
    public interface BrokerMessageReceiverWithResponse {
        String onMessage(String str);
    }

    public RabbitMqManager(Address address) {
        this.containers = new ArrayList();
        this.address = address;
        this.retryTimeOut = PropertiesManager.getProperty(RETRY_TIMEOUT_PROPERTY, 500);
        this.numRetries = PropertiesManager.getProperty(NUM_RETRIES_PROPERTY, 5);
    }

    public RabbitMqManager(String str, String str2, String str3, String str4, String str5) {
        this(new Address(str, Integer.parseInt(str2)));
        this.username = str3;
        this.password = str4;
        this.vhost = str5;
    }

    public void connect() {
        this.cf = new CachingConnectionFactory(this.address.getHost(), this.address.getPort());
        if (this.username != null) {
            this.cf.setUsername(this.username);
        }
        if (this.password != null) {
            this.cf.setPassword(this.password);
        }
        if (this.vhost != null) {
            this.cf.setVirtualHost(this.vhost);
        }
        this.admin = new RabbitAdmin(this.cf);
        declarePipelineCreationQueue(this.admin);
    }

    private void declarePipelineCreationQueue(RabbitAdmin rabbitAdmin) {
        Queue queue = new Queue(PIPELINE_CREATION_QUEUE, true, false, false);
        rabbitAdmin.declareQueue(queue);
        DirectExchange directExchange = new DirectExchange(PIPELINE_CREATION_QUEUE, true, false);
        rabbitAdmin.declareExchange(directExchange);
        rabbitAdmin.declareBinding(BindingBuilder.bind(queue).to(directExchange).with(""));
        log.debug("Queue 'pipeline_creation' declared. Exchange 'pipeline_creation' declared.");
    }

    public Queue declarePipelineQueue(String str) {
        Queue queue = new Queue(str);
        this.admin.declareQueue(queue);
        return queue;
    }

    public Queue declareClientQueue() {
        return this.admin.declareQueue();
    }

    public RabbitTemplate createClientTemplate() {
        Queue declareQueue = this.admin.declareQueue();
        RabbitTemplate rabbitTemplate = new RabbitTemplate(this.cf);
        rabbitTemplate.setReplyTimeout(this.retryTimeOut);
        rabbitTemplate.setReplyQueue(declareQueue);
        SimpleMessageListenerContainer simpleMessageListenerContainer = new SimpleMessageListenerContainer(this.cf);
        simpleMessageListenerContainer.setMessageListener(rabbitTemplate);
        simpleMessageListenerContainer.setQueueNames(new String[]{declareQueue.getName()});
        simpleMessageListenerContainer.start();
        this.containers.add(simpleMessageListenerContainer);
        log.debug("Created RabbitMqTemplate receiving messages in queue: {}", declareQueue.getName());
        return rabbitTemplate;
    }

    public String sendAndReceive(String str, String str2, Request<? extends Object> request) {
        return sendAndReceive(str, str2, request, null);
    }

    public String sendAndReceive(String str, String str2, Request<? extends Object> request, RabbitTemplate rabbitTemplate) {
        if (rabbitTemplate == null) {
            rabbitTemplate = new RabbitTemplate(this.cf);
            rabbitTemplate.setReplyTimeout(this.retryTimeOut);
        }
        log.debug("Req-> Exchange:'" + str + "' RoutingKey:'" + str2 + "' " + request);
        MessageProperties messageProperties = new MessageProperties();
        messageProperties.setExpiration(EXPIRATION_TIME);
        messageProperties.setCorrelationId(calculateCorrelationId(request).getBytes());
        for (int i = 0; i < this.numRetries + 1; i++) {
            if (i > 0) {
                log.debug("Retry {} sending message: {}", Integer.valueOf(i), request);
            }
            Message sendAndReceive = rabbitTemplate.sendAndReceive(str, str2, new Message(request.toString().getBytes(), messageProperties));
            if (sendAndReceive != null) {
                String str3 = new String(sendAndReceive.getBody());
                log.debug("<-Res " + str3.trim());
                return str3;
            }
        }
        throw new RabbitMqException("Timeout waiting a reply to message: " + request);
    }

    private String calculateCorrelationId(Request<? extends Object> request) {
        if (request.getSessionId() == null) {
            throw new AssertionError("request without session can't be send with RabbitMqManager");
        }
        return request.getSessionId() + "/" + request.getId();
    }

    public RabbitTemplate createServerTemplate() {
        return new RabbitTemplate(this.cf);
    }

    public void send(String str, String str2, String str3) {
        send(str, str2, str3, null);
    }

    public void send(String str, String str2, String str3, RabbitTemplate rabbitTemplate) {
        if (rabbitTemplate == null) {
            rabbitTemplate = new RabbitTemplate(this.cf);
        }
        log.debug("Not-> Exchange:'" + str + "' RoutingKey:'" + str2 + "' " + str3);
        rabbitTemplate.send(str, str2, new Message(str3.getBytes(), new MessageProperties()));
    }

    public String declareEventsExchange(String str) {
        String str2 = EVENT_QUEUE_PREFIX + str;
        this.admin.declareExchange(new DirectExchange(str2, false, true));
        log.debug("Events exchange '" + str2 + "' declared.");
        return str2;
    }

    public void addMessageReceiver(final String str, final BrokerMessageReceiver brokerMessageReceiver) {
        SimpleMessageListenerContainer simpleMessageListenerContainer = new SimpleMessageListenerContainer(this.cf);
        simpleMessageListenerContainer.setMessageListener(new MessageListenerAdapter(new Object() { // from class: org.kurento.rabbitmq.RabbitMqManager.1
            protected void onMessage(byte[] bArr) {
                onMessage(new String(bArr));
            }

            protected void onMessage(String str2) {
                RabbitMqManager.log.debug("<-Not Queue:'" + str + "' " + str2.trim());
                brokerMessageReceiver.onMessage(str2);
            }
        }, "onMessage"));
        simpleMessageListenerContainer.setQueueNames(new String[]{str});
        simpleMessageListenerContainer.start();
        this.containers.add(simpleMessageListenerContainer);
        log.debug("Registered receiver '" + brokerMessageReceiver.getClass().getName() + "' for queue '" + str);
    }

    public void addMessageReceiverWithResponse(final String str, final BrokerMessageReceiverWithResponse brokerMessageReceiverWithResponse) {
        SimpleMessageListenerContainer simpleMessageListenerContainer = new SimpleMessageListenerContainer(this.cf);
        simpleMessageListenerContainer.setConcurrentConsumers(10);
        simpleMessageListenerContainer.setMessageListener(new MessageListenerAdapter(new Object() { // from class: org.kurento.rabbitmq.RabbitMqManager.2
            protected String onMessage(byte[] bArr) {
                String str2 = new String(bArr);
                RabbitMqManager.log.debug("<-Req Queue:'" + str + "' " + str2);
                String onMessage = brokerMessageReceiverWithResponse.onMessage(str2);
                RabbitMqManager.log.debug("Res-> " + onMessage);
                return onMessage;
            }
        }, "onMessage"));
        simpleMessageListenerContainer.setQueueNames(new String[]{str});
        simpleMessageListenerContainer.start();
        this.containers.add(simpleMessageListenerContainer);
        log.debug("Registered receiver with response '" + brokerMessageReceiverWithResponse.getClass().getName() + "' for queue '" + str);
    }

    public void bindExchangeToQueue(String str, String str2, String str3) {
        this.admin.declareBinding(BindingBuilder.bind(new Queue(str2, false, true, true)).to(new DirectExchange(str)).with(str3));
        log.debug("Exchange '" + str + "' bind to queue '" + str2 + "' with routingKey '" + str3 + "'");
    }

    public String createRoutingKey(String str, String str2) {
        return str + "/" + str2;
    }

    public void destroy() {
        Iterator<SimpleMessageListenerContainer> it = this.containers.iterator();
        while (it.hasNext()) {
            it.next().destroy();
        }
        this.containers.clear();
        this.cf.destroy();
    }
}
