package org.duracloud.common.queue.rabbitmq;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.GetResponse;
import java.io.IOException;
import java.io.StringReader;
import java.io.StringWriter;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Properties;
import java.util.Set;
import org.apache.commons.lang3.time.DurationFormatUtils;
import org.duracloud.common.error.DuraCloudRuntimeException;
import org.duracloud.common.queue.TaskException;
import org.duracloud.common.queue.TaskNotFoundException;
import org.duracloud.common.queue.TaskQueue;
import org.duracloud.common.queue.TimeoutException;
import org.duracloud.common.queue.task.Task;
import org.duracloud.common.retry.Retriable;
import org.duracloud.common.retry.Retrier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/duracloud/common/queue/rabbitmq/RabbitMQTaskQueue.class */
public class RabbitMQTaskQueue implements TaskQueue {
    private static Logger log = LoggerFactory.getLogger(RabbitMQTaskQueue.class);
    private Channel mqChannel;
    private String queueName;
    private Integer visibilityTimeout = -1;
    private Integer unAcknowlededMesageCount = 0;
    private String queueUrl;
    private String exchangeName;

    /* loaded from: input_file:org/duracloud/common/queue/rabbitmq/RabbitMQTaskQueue$MsgProp.class */
    public enum MsgProp {
        DELIVERY_TAG,
        ROUTING_KEY,
        EXCHANGE
    }

    public RabbitMQTaskQueue(String str, Integer num, String str2, String str3, String str4, String str5, String str6) {
        try {
            this.exchangeName = str3;
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setUsername(str4);
            connectionFactory.setPassword(str5);
            connectionFactory.setVirtualHost(str2);
            connectionFactory.setHost(str);
            connectionFactory.setPort(num.intValue());
            Connection newConnection = connectionFactory.newConnection();
            this.mqChannel = newConnection.createChannel();
            this.mqChannel.queueBind(str6, this.exchangeName, str6);
            this.queueUrl = "(RabbitMQ) " + newConnection.getAddress();
            this.queueName = str6;
        } catch (Exception e) {
            log.error("Failed to estabilish connection to RabbitMQ with queue name {} and URL {} because {}", str6, this.queueUrl, e.getMessage());
            throw new DuraCloudRuntimeException(e);
        }
    }

    public RabbitMQTaskQueue(Connection connection, String str, String str2) {
        try {
            this.exchangeName = str;
            this.mqChannel = connection.createChannel();
            this.mqChannel.queueBind(str2, str, str2);
            this.queueUrl = "(RabbitMQ) " + connection.getAddress();
            this.queueName = str2;
        } catch (Exception e) {
            log.error("Failed to estabilish connection to RabbitMQ with queue name {} and URL {} because {}", str2, this.queueUrl, e.getMessage());
            throw new DuraCloudRuntimeException(e);
        }
    }

    @Override // org.duracloud.common.queue.TaskQueue
    public String getName() {
        return this.queueName;
    }

    protected Task marshallTask(byte[] bArr, long j, String str, String str2) {
        Properties properties = new Properties();
        Task task = null;
        try {
            properties.load(new StringReader(new String(bArr)));
            if (properties.containsKey("type")) {
                task = new Task();
                for (String str3 : properties.stringPropertyNames()) {
                    if (str3.equals("type")) {
                        task.setType(Task.Type.valueOf(properties.getProperty(str3)));
                    } else {
                        task.addProperty(str3, properties.getProperty(str3));
                    }
                }
                task.addProperty(MsgProp.DELIVERY_TAG.name(), String.valueOf(j));
                task.addProperty(MsgProp.ROUTING_KEY.name(), str);
                task.addProperty(MsgProp.EXCHANGE.name(), str2);
            } else {
                log.error("RabbitMQ message from queue: " + this.queueName + " at " + this.queueUrl + ", does not contain a 'task type'");
            }
        } catch (IOException e) {
            log.error("Error creating Task", (Throwable) e);
        }
        return task;
    }

    protected String unmarshallTask(Task task) {
        Properties properties = new Properties();
        properties.setProperty("type", task.getType().name());
        for (String str : task.getProperties().keySet()) {
            String property = task.getProperty(str);
            if (null != property) {
                properties.setProperty(str, property);
            }
        }
        StringWriter stringWriter = new StringWriter();
        String str2 = null;
        try {
            properties.store(stringWriter, (String) null);
            str2 = stringWriter.toString();
        } catch (IOException e) {
            log.error("Error unmarshalling Task, queue: " + this.queueName + ", msgBody: " + str2, (Throwable) e);
        }
        return str2;
    }

    @Override // org.duracloud.common.queue.TaskQueue
    public void put(Task task) {
        try {
            final String str = this.queueName;
            final byte[] bytes = unmarshallTask(task).getBytes();
            new Retrier(4, 10000, 2).execute(new Retriable() { // from class: org.duracloud.common.queue.rabbitmq.RabbitMQTaskQueue.1
                @Override // org.duracloud.common.retry.Retriable
                public Object retry() throws Exception {
                    RabbitMQTaskQueue.this.mqChannel.basicPublish(RabbitMQTaskQueue.this.exchangeName, str, null, bytes);
                    return null;
                }
            });
            Integer num = this.unAcknowlededMesageCount;
            this.unAcknowlededMesageCount = Integer.valueOf(this.unAcknowlededMesageCount.intValue() + 1);
            log.info("RabbitMQ message successfully placed {} on queue - queue: {}", task, str);
        } catch (Exception e) {
            log.error("failed to place {} on {} at {} due to {}", task, this.queueName, this.queueUrl, e.getMessage());
            throw new DuraCloudRuntimeException(e);
        }
    }

    @Override // org.duracloud.common.queue.TaskQueue
    public void put(Task... taskArr) {
        for (Task task : taskArr) {
            put(task);
        }
    }

    @Override // org.duracloud.common.queue.TaskQueue
    public void put(Set<Task> set) {
        Iterator<Task> it = set.iterator();
        while (it.hasNext()) {
            put(it.next());
        }
    }

    @Override // org.duracloud.common.queue.TaskQueue
    public Set<Task> take(int i) throws TimeoutException {
        Integer size = size();
        if (size.intValue() <= 0) {
            throw new TimeoutException("No tasks available from queue: " + this.queueName + ", queueUrl: " + this.queueUrl);
        }
        if (size.intValue() < i) {
            size = Integer.valueOf(i);
        }
        HashSet hashSet = new HashSet();
        for (int i2 = 0; i2 < size.intValue(); i2++) {
            try {
                hashSet.add(take());
            } catch (Exception e) {
                Iterator it = hashSet.iterator();
                while (it.hasNext()) {
                    requeue((Task) it.next());
                }
                throw new TimeoutException("Failed to get at least one message from queue: " + this.queueName + ", queueUrl: " + this.queueUrl);
            }
        }
        return hashSet;
    }

    @Override // org.duracloud.common.queue.TaskQueue
    public Task take() throws TimeoutException {
        if (size().intValue() <= 0) {
            throw new TimeoutException("No tasks available from queue: " + this.queueName + ", queueUrl: " + this.queueUrl);
        }
        try {
            GetResponse basicGet = this.mqChannel.basicGet(this.queueName, false);
            if (basicGet == null) {
                throw new TimeoutException("No tasks available from queue: " + this.queueName + ", queueUrl: " + this.queueUrl);
            }
            AMQP.BasicProperties props = basicGet.getProps();
            Envelope envelope = basicGet.getEnvelope();
            byte[] body = basicGet.getBody();
            String routingKey = envelope.getRoutingKey();
            String exchange = envelope.getExchange();
            long deliveryTag = envelope.getDeliveryTag();
            log.info("RabbitMQ message received - queue: {}, queueUrl: {}, deliveryTag: {}, preworkQueueTime: {}", this.queueName, this.queueUrl, Long.valueOf(deliveryTag), DurationFormatUtils.formatDuration(Long.valueOf(System.currentTimeMillis() - Long.valueOf(props.getTimestamp().getTime()).longValue()).longValue(), "HH:mm:ss,SSS"));
            Task marshallTask = marshallTask(body, deliveryTag, routingKey, exchange);
            marshallTask.setVisibilityTimeout(this.visibilityTimeout);
            return marshallTask;
        } catch (Exception e) {
            log.error("failed to take task from " + this.queueName + " due to " + e.getMessage(), (Throwable) e);
            throw new TimeoutException("No tasks available from queue: " + this.queueName + ", queueUrl: " + this.queueUrl);
        }
    }

    @Override // org.duracloud.common.queue.TaskQueue
    public void extendVisibilityTimeout(Task task) throws TaskNotFoundException {
    }

    @Override // org.duracloud.common.queue.TaskQueue
    public void deleteTask(Task task) throws TaskNotFoundException {
        try {
            this.mqChannel.basicAck(Long.parseLong(task.getProperty(MsgProp.DELIVERY_TAG.name())), false);
            log.info("successfully deleted {}", task);
            Integer num = this.unAcknowlededMesageCount;
            this.unAcknowlededMesageCount = Integer.valueOf(this.unAcknowlededMesageCount.intValue() - 1);
        } catch (Exception e) {
            log.error("failed to delete task " + task + ": " + e.getMessage(), (Throwable) e);
            throw new TaskNotFoundException(e);
        }
    }

    @Override // org.duracloud.common.queue.TaskQueue
    public void deleteTasks(Set<Task> set) throws TaskException {
        if (set.size() > 10) {
            throw new IllegalArgumentException("task set must contain 10 or fewer tasks");
        }
        try {
            Iterator<Task> it = set.iterator();
            while (it.hasNext()) {
                deleteTask(it.next());
            }
        } catch (Exception e) {
            log.error("failed to batch delete tasks " + set + ": " + e.getMessage(), (Throwable) e);
            throw new TaskException(e);
        }
    }

    @Override // org.duracloud.common.queue.TaskQueue
    public void requeue(Task task) {
        int attempts = task.getAttempts();
        task.incrementAttempts();
        try {
            this.mqChannel.basicReject(Long.parseLong(task.getProperty(MsgProp.DELIVERY_TAG.name())), true);
            Integer num = this.unAcknowlededMesageCount;
            this.unAcknowlededMesageCount = Integer.valueOf(this.unAcknowlededMesageCount.intValue() - 1);
        } catch (Exception e) {
            log.error("unable to reject message {}, re-put message instead ", task);
            put(task);
        }
        log.warn("requeued {} after {} failed attempts.", task, Integer.valueOf(attempts));
    }

    @Override // org.duracloud.common.queue.TaskQueue
    public Integer size() {
        try {
            return Integer.valueOf(Long.valueOf(this.mqChannel.messageCount(this.queueName)).intValue());
        } catch (Exception e) {
            return 0;
        }
    }

    @Override // org.duracloud.common.queue.TaskQueue
    public Integer sizeIncludingInvisibleAndDelayed() {
        return Integer.valueOf(size().intValue() + this.unAcknowlededMesageCount.intValue());
    }

    private Integer getVisibilityTimeout() {
        return this.visibilityTimeout;
    }

    private String getQueueUrl() {
        return this.queueUrl;
    }
}
