package io.accelerate.events.interop.queue.connector;

import com.amazonaws.SdkClientException;
import com.amazonaws.services.sqs.AmazonSQS;
import com.amazonaws.services.sqs.model.DeleteMessageBatchRequest;
import com.amazonaws.services.sqs.model.DeleteMessageBatchRequestEntry;
import com.amazonaws.services.sqs.model.GetQueueAttributesResult;
import com.amazonaws.services.sqs.model.Message;
import com.amazonaws.services.sqs.model.MessageAttributeValue;
import com.amazonaws.services.sqs.model.ReceiveMessageRequest;
import com.amazonaws.services.sqs.model.SendMessageRequest;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.lang.invoke.MethodType;
import java.lang.runtime.ObjectMethods;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/accelerate/events/interop/queue/connector/SqsEventQueue.class */
public class SqsEventQueue {
    private static final Logger log = LoggerFactory.getLogger(SqsEventQueue.class);
    private static final String ATTRIBUTE_EVENT_NAME = "name";
    private static final String ATTRIBUTE_EVENT_VERSION = "version";
    private final AmazonSQS client;
    private final String queueUrl;
    private final ObjectMapper mapper = new ObjectMapper();
    private MessageProcessingThread messageProcessingThread;

    /* loaded from: input_file:io/accelerate/events/interop/queue/connector/SqsEventQueue$MessageProcessingThread.class */
    static class MessageProcessingThread extends Thread {
        static final int MAX_NUMBER_OF_MESSAGES = 10;
        static final int MAX_AWS_WAIT = 20;
        private final AmazonSQS client;
        private final ObjectMapper mapper;
        private final ExecutorService executorService;
        private final DeleteMessageBatchRequest deleteMessageBatchRequest;
        private final AtomicBoolean shouldContinue;
        private final QueueEventHandlers eventHandlers;
        private final int maxProcessingThreads = MAX_NUMBER_OF_MESSAGES;
        private final ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest();

        MessageProcessingThread(AmazonSQS amazonSQS, String str, ObjectMapper objectMapper, QueueEventHandlers queueEventHandlers) {
            this.client = amazonSQS;
            this.mapper = objectMapper;
            this.eventHandlers = queueEventHandlers;
            this.receiveMessageRequest.setMaxNumberOfMessages(Integer.valueOf(MAX_NUMBER_OF_MESSAGES));
            this.receiveMessageRequest.setQueueUrl(str);
            this.receiveMessageRequest.setWaitTimeSeconds(Integer.valueOf(MAX_AWS_WAIT));
            this.receiveMessageRequest.setMessageAttributeNames(Arrays.asList(SqsEventQueue.ATTRIBUTE_EVENT_NAME, SqsEventQueue.ATTRIBUTE_EVENT_VERSION));
            this.deleteMessageBatchRequest = new DeleteMessageBatchRequest();
            this.deleteMessageBatchRequest.setQueueUrl(str);
            this.executorService = new ThreadPoolExecutor(1, MAX_NUMBER_OF_MESSAGES, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue(), Executors.defaultThreadFactory());
            this.shouldContinue = new AtomicBoolean(true);
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            while (this.shouldContinue.get()) {
                processBatch();
            }
        }

        void signalStop() throws InterruptedException {
            this.shouldContinue.set(false);
            this.client.shutdown();
            this.executorService.shutdown();
            this.executorService.awaitTermination(30L, TimeUnit.SECONDS);
        }

        void processBatch() {
            ExecutorCompletionService executorCompletionService = new ExecutorCompletionService(this.executorService);
            List messages = this.client.receiveMessage(this.receiveMessageRequest).getMessages();
            messages.forEach(message -> {
                executorCompletionService.submit(() -> {
                    return process(message);
                });
            });
            int i = 0;
            ArrayList arrayList = new ArrayList();
            while (i < messages.size()) {
                try {
                    try {
                        arrayList.add((Message) executorCompletionService.take().get());
                        i++;
                    } catch (Exception e) {
                        SqsEventQueue.log.error("Failed to process queue message", e);
                        i++;
                    }
                } catch (Throwable th) {
                    int i2 = i + 1;
                    throw th;
                }
            }
            if (arrayList.size() > 0) {
                this.deleteMessageBatchRequest.setEntries((List) arrayList.stream().map(message2 -> {
                    return new DeleteMessageBatchRequestEntry(message2.getMessageId(), message2.getReceiptHandle());
                }).collect(Collectors.toList()));
                this.client.deleteMessageBatch(this.deleteMessageBatchRequest);
            }
        }

        private Message process(Message message) throws EventDeserializationException, EventProcessingException {
            if (!message.getMessageAttributes().containsKey(SqsEventQueue.ATTRIBUTE_EVENT_NAME)) {
                throw new EventDeserializationException("Message does not contain the name attribute");
            }
            if (!message.getMessageAttributes().containsKey(SqsEventQueue.ATTRIBUTE_EVENT_VERSION)) {
                throw new EventDeserializationException("Message does not contain the version attribute");
            }
            SqsEventQueue.log.debug("Attributes for message:" + message.getBody() + " -> " + String.valueOf(message.getMessageAttributes().entrySet()));
            String stringValue = ((MessageAttributeValue) message.getMessageAttributes().get(SqsEventQueue.ATTRIBUTE_EVENT_NAME)).getStringValue();
            String stringValue2 = ((MessageAttributeValue) message.getMessageAttributes().get(SqsEventQueue.ATTRIBUTE_EVENT_VERSION)).getStringValue();
            if (!this.eventHandlers.canHandle(stringValue, stringValue2)) {
                throw new EventProcessingException("No event handlers for: " + stringValue + ", version: " + stringValue2);
            }
            HandleRule handleRuleFor = this.eventHandlers.getHandleRuleFor(stringValue, stringValue2);
            try {
                Object readValue = this.mapper.readerFor(handleRuleFor.getType()).readValue(message.getBody());
                this.eventHandlers.getBeforeEventInspector().inspect(stringValue, stringValue2, readValue);
                handleRuleFor.getConsumer().accept(readValue);
                this.eventHandlers.getAfterEventInspector().inspect(stringValue, stringValue2, readValue);
                return message;
            } catch (IOException e) {
                throw new EventDeserializationException("Cannot deserialize message into an instance of " + String.valueOf(handleRuleFor.getType()) + "." + message.getBody(), e);
            } catch (Exception e2) {
                throw new EventProcessingException("Exception while consuming individual message: " + message.getBody(), e2);
            }
        }
    }

    /* loaded from: input_file:io/accelerate/events/interop/queue/connector/SqsEventQueue$asdasdg.class */
    private static final class asdasdg extends Record {
        private final GetQueueAttributesResult queueAttributes;
        private final String queueAttribute;

        private asdasdg(GetQueueAttributesResult getQueueAttributesResult, String str) {
            this.queueAttributes = getQueueAttributesResult;
            this.queueAttribute = str;
        }

        @Override // java.lang.Record
        public final String toString() {
            return (String) ObjectMethods.bootstrap(MethodHandles.lookup(), "toString", MethodType.methodType(String.class, asdasdg.class), asdasdg.class, "queueAttributes;queueAttribute", "FIELD:Lio/accelerate/events/interop/queue/connector/SqsEventQueue$asdasdg;->queueAttributes:Lcom/amazonaws/services/sqs/model/GetQueueAttributesResult;", "FIELD:Lio/accelerate/events/interop/queue/connector/SqsEventQueue$asdasdg;->queueAttribute:Ljava/lang/String;").dynamicInvoker().invoke(this) /* invoke-custom */;
        }

        @Override // java.lang.Record
        public final int hashCode() {
            return (int) ObjectMethods.bootstrap(MethodHandles.lookup(), "hashCode", MethodType.methodType(Integer.TYPE, asdasdg.class), asdasdg.class, "queueAttributes;queueAttribute", "FIELD:Lio/accelerate/events/interop/queue/connector/SqsEventQueue$asdasdg;->queueAttributes:Lcom/amazonaws/services/sqs/model/GetQueueAttributesResult;", "FIELD:Lio/accelerate/events/interop/queue/connector/SqsEventQueue$asdasdg;->queueAttribute:Ljava/lang/String;").dynamicInvoker().invoke(this) /* invoke-custom */;
        }

        @Override // java.lang.Record
        public final boolean equals(Object obj) {
            return (boolean) ObjectMethods.bootstrap(MethodHandles.lookup(), "equals", MethodType.methodType(Boolean.TYPE, asdasdg.class, Object.class), asdasdg.class, "queueAttributes;queueAttribute", "FIELD:Lio/accelerate/events/interop/queue/connector/SqsEventQueue$asdasdg;->queueAttributes:Lcom/amazonaws/services/sqs/model/GetQueueAttributesResult;", "FIELD:Lio/accelerate/events/interop/queue/connector/SqsEventQueue$asdasdg;->queueAttribute:Ljava/lang/String;").dynamicInvoker().invoke(this, obj) /* invoke-custom */;
        }

        public GetQueueAttributesResult queueAttributes() {
            return this.queueAttributes;
        }

        public String queueAttribute() {
            return this.queueAttribute;
        }
    }

    public SqsEventQueue(AmazonSQS amazonSQS, String str) {
        this.client = amazonSQS;
        this.queueUrl = str;
    }

    public String getQueueUrl() {
        return this.queueUrl;
    }

    public QueueSize getQueueSize() {
        return new QueueSize(Integer.parseInt(getQueueAttribute("ApproximateNumberOfMessages")), Integer.parseInt(getQueueAttribute("ApproximateNumberOfMessagesNotVisible")), Integer.parseInt(getQueueAttribute("ApproximateNumberOfMessagesDelayed")));
    }

    private String getQueueAttribute(String str) {
        return (String) this.client.getQueueAttributes(this.queueUrl, Collections.singletonList(str)).getAttributes().get(str);
    }

    public void send(Object obj) throws EventSerializationException, EventProcessingException {
        send(obj, 10000, 5000);
    }

    public void send(Object obj, int i, int i2) throws EventSerializationException, EventProcessingException {
        QueueEvent queueEvent = (QueueEvent) obj.getClass().getAnnotation(QueueEvent.class);
        if (queueEvent == null) {
            throw new EventSerializationException(String.valueOf(obj.getClass()) + " not a QueueEvent");
        }
        String name = queueEvent.name();
        String version = queueEvent.version();
        try {
            SendMessageRequest sendMessageRequest = new SendMessageRequest();
            sendMessageRequest.setSdkClientExecutionTimeout(i);
            sendMessageRequest.setSdkRequestTimeout(i2);
            sendMessageRequest.setQueueUrl(this.queueUrl);
            sendMessageRequest.setMessageBody(this.mapper.writeValueAsString(obj));
            sendMessageRequest.addMessageAttributesEntry(ATTRIBUTE_EVENT_NAME, new MessageAttributeValue().withDataType("String").withStringValue(name));
            sendMessageRequest.addMessageAttributesEntry(ATTRIBUTE_EVENT_VERSION, new MessageAttributeValue().withDataType("String").withStringValue(version));
            this.client.sendMessage(sendMessageRequest);
        } catch (SdkClientException e) {
            throw new EventProcessingException("Failed to send message due to connectivity issues.", e);
        } catch (JsonProcessingException e2) {
            throw new EventSerializationException("Failed to serialize event of type " + String.valueOf(obj.getClass()), e2);
        }
    }

    public void subscribeToMessages(QueueEventHandlers queueEventHandlers) {
        this.messageProcessingThread = new MessageProcessingThread(this.client, this.queueUrl, this.mapper, queueEventHandlers);
        this.messageProcessingThread.start();
    }

    public void unsubscribeFromMessages() throws InterruptedException {
        if (this.messageProcessingThread == null) {
            throw new IllegalStateException("Cannot unsubscribe without first being subscribed");
        }
        this.messageProcessingThread.signalStop();
        this.messageProcessingThread.join();
    }
}
