package org.awsutils.sqs.listener;

import io.vavr.Tuple;
import io.vavr.Tuple3;
import java.lang.reflect.Proxy;
import java.math.BigInteger;
import java.text.MessageFormat;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.awsutils.common.exceptions.UtilsException;
import org.awsutils.common.ratelimiter.RateLimiter;
import org.awsutils.common.ratelimiter.RateLimiterFactory;
import org.awsutils.common.util.ApplicationContextUtils;
import org.awsutils.common.util.Utils;
import org.awsutils.sqs.client.MessageConstants;
import org.awsutils.sqs.client.SqsMessageClient;
import org.awsutils.sqs.config.WorkerNodeCheckFunc;
import org.awsutils.sqs.handler.MessageHandlerFactory;
import org.awsutils.sqs.handler.SqsMessageHandler;
import org.awsutils.sqs.listener.SqsMessageListener;
import org.awsutils.sqs.message.MessageAttribute;
import org.awsutils.sqs.message.SnsSubscriptionMessage;
import org.awsutils.sqs.message.SqsMessage;
import org.awsutils.sqs.message.TaskInput;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.core.env.Environment;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;
import software.amazon.awssdk.services.sqs.SqsAsyncClient;
import software.amazon.awssdk.services.sqs.SqsClient;
import software.amazon.awssdk.services.sqs.model.Message;
import software.amazon.awssdk.services.sqs.model.MessageAttributeValue;
import software.amazon.awssdk.services.sqs.model.MessageSystemAttributeName;
import software.amazon.awssdk.services.sqs.model.QueueAttributeName;
import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest;

/* loaded from: input_file:org/awsutils/sqs/listener/SqsMessageListenerImpl.class */
final class SqsMessageListenerImpl implements SqsMessageListener {
    private final String queueName;
    private final Environment environment;
    private String queueUrl;
    private final SqsAsyncClient sqsAsyncClient;
    private final SqsClient sqsSyncClient;
    private final MessageHandlerFactory messageHandlerFactory;
    private final SqsMessageClient sqsMessageClient;
    private final String rateLimiterName;
    private final ExecutorService executorService;
    private final String maximumNumberOfMessagesKey;
    private final Function<String, Integer> propertyReaderFunction;
    private final WorkerNodeCheckFunc workerNodeCheck;
    private final Semaphore semaphore;
    private final String listenerName;
    private final String messageHandlerRateLimiterName;
    private RateLimiter rateLimiter;
    private RateLimiter messageHandlerRateLimiter;
    private final Function<String, String> queueUrlFunc;
    private final boolean listenerEnabled;
    private final Integer waitTimeInSeconds;
    private static final int MAXIMUM_NUMBER_OF_MESSAGES = 2000;
    private static final Logger log = LoggerFactory.getLogger(SqsMessageListenerImpl.class);
    private static final Integer MAX_NUMBER_OF_SQS_MESSAGES = 10;
    private static final long SEMAPHORE_TIMEOUT_IN_SECONDS = 15;
    private static final long CHANGE_VISIBILITY_PERIOD_IN_SECONDS = TimeUnit.MINUTES.toSeconds(SEMAPHORE_TIMEOUT_IN_SECONDS);
    private static final Logger LOGGER = LoggerFactory.getLogger(SqsMessageListenerImpl.class);
    private static final Thread SHUTDOWN_HOOK = new Thread();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/awsutils/sqs/listener/SqsMessageListenerImpl$SqsMessageListenerBuilder.class */
    public static class SqsMessageListenerBuilder implements SqsMessageListener.Builder {
        private String queueName;
        private SqsAsyncClient sqsAsyncClient;
        private MessageHandlerFactory messageHandlerFactory;
        private SqsMessageClient sqsMessageClient;
        private ExecutorService executorService;
        private String maximumNumberOfMessagesKey;
        private Function<String, Integer> propertyReaderFunction;
        private WorkerNodeCheckFunc workerNodeCheck;
        private Semaphore semaphore;
        private String listenerName;
        private String rateLimiterName;
        private String messageHandlerRateLimiter;
        private String statusProperty;
        private Integer waitTimeInSeconds;
        private String queueUrl;
        private SqsClient sqsSyncClient;

        private SqsMessageListenerBuilder() {
        }

        @Override // org.awsutils.sqs.listener.SqsMessageListener.Builder
        public SqsMessageListener.Builder queueName(String str) {
            this.queueName = str;
            return this;
        }

        @Override // org.awsutils.sqs.listener.SqsMessageListener.Builder
        public SqsMessageListener.Builder queueUrl(String str) {
            this.queueUrl = str;
            return this;
        }

        @Override // org.awsutils.sqs.listener.SqsMessageListener.Builder
        public SqsMessageListener.Builder sqsAsyncClient(SqsAsyncClient sqsAsyncClient) {
            this.sqsAsyncClient = sqsAsyncClient;
            return this;
        }

        @Override // org.awsutils.sqs.listener.SqsMessageListener.Builder
        public SqsMessageListener.Builder messageHandlerFactory(MessageHandlerFactory messageHandlerFactory) {
            this.messageHandlerFactory = messageHandlerFactory;
            return this;
        }

        @Override // org.awsutils.sqs.listener.SqsMessageListener.Builder
        public SqsMessageListener.Builder sqsMessageClient(SqsMessageClient sqsMessageClient) {
            this.sqsMessageClient = sqsMessageClient;
            return this;
        }

        @Override // org.awsutils.sqs.listener.SqsMessageListener.Builder
        public SqsMessageListener.Builder executorService(ExecutorService executorService) {
            this.executorService = executorService;
            return this;
        }

        @Override // org.awsutils.sqs.listener.SqsMessageListener.Builder
        public SqsMessageListener.Builder maximumNumberOfMessagesKey(String str) {
            this.maximumNumberOfMessagesKey = str;
            return this;
        }

        @Override // org.awsutils.sqs.listener.SqsMessageListener.Builder
        public SqsMessageListener.Builder propertyReaderFunction(Function<String, Integer> function) {
            this.propertyReaderFunction = function;
            return this;
        }

        @Override // org.awsutils.sqs.listener.SqsMessageListener.Builder
        public SqsMessageListener.Builder workerNodeCheck(WorkerNodeCheckFunc workerNodeCheckFunc) {
            this.workerNodeCheck = workerNodeCheckFunc;
            return this;
        }

        @Override // org.awsutils.sqs.listener.SqsMessageListener.Builder
        public SqsMessageListener.Builder semaphore(Semaphore semaphore) {
            this.semaphore = semaphore;
            return this;
        }

        @Override // org.awsutils.sqs.listener.SqsMessageListener.Builder
        public SqsMessageListener.Builder listenerName(String str) {
            this.listenerName = str;
            return this;
        }

        @Override // org.awsutils.sqs.listener.SqsMessageListener.Builder
        public SqsMessageListener.Builder rateLimiterName(String str) {
            this.rateLimiterName = str;
            return this;
        }

        @Override // org.awsutils.sqs.listener.SqsMessageListener.Builder
        public SqsMessageListener.Builder messageHandlerRateLimiter(String str) {
            this.messageHandlerRateLimiter = str;
            return this;
        }

        @Override // org.awsutils.sqs.listener.SqsMessageListener.Builder
        public SqsMessageListener.Builder statusProperty(String str) {
            this.statusProperty = str;
            return this;
        }

        @Override // org.awsutils.sqs.listener.SqsMessageListener.Builder
        public SqsMessageListener.Builder waitTimeInSeconds(Integer num) {
            this.waitTimeInSeconds = num;
            return this;
        }

        @Override // org.awsutils.sqs.listener.SqsMessageListener.Builder
        public SqsMessageListener.Builder sqsSyncClient(SqsClient sqsClient) {
            this.sqsSyncClient = sqsClient;
            return this;
        }

        @Override // org.awsutils.sqs.listener.SqsMessageListener.Builder
        public SqsMessageListener build() {
            SqsMessageListenerImpl sqsMessageListenerImpl = new SqsMessageListenerImpl(this.sqsAsyncClient, this.queueName, this.queueUrl, this.sqsSyncClient, this.sqsMessageClient, this.messageHandlerFactory, this.executorService, this.rateLimiterName, this.maximumNumberOfMessagesKey, this.semaphore, this.propertyReaderFunction, this.workerNodeCheck, !StringUtils.hasLength(this.listenerName) ? UUID.randomUUID().toString() : this.listenerName.trim(), this.messageHandlerRateLimiter, this.statusProperty, this.waitTimeInSeconds);
            return (SqsMessageListener) Proxy.newProxyInstance(getClass().getClassLoader(), new Class[]{SqsMessageListener.class}, (obj, method, objArr) -> {
                return method.invoke(sqsMessageListenerImpl, objArr);
            });
        }
    }

    private SqsMessageListenerImpl(SqsAsyncClient sqsAsyncClient, String str, String str2, SqsClient sqsClient, SqsMessageClient sqsMessageClient, MessageHandlerFactory messageHandlerFactory, ExecutorService executorService, String str3, String str4, Semaphore semaphore, Function<String, Integer> function, WorkerNodeCheckFunc workerNodeCheckFunc, String str5, String str6, String str7, Integer num) {
        this.sqsSyncClient = sqsClient;
        this.rateLimiterName = str3;
        this.messageHandlerRateLimiterName = str6;
        this.waitTimeInSeconds = num;
        this.sqsAsyncClient = sqsAsyncClient;
        this.messageHandlerFactory = messageHandlerFactory;
        this.propertyReaderFunction = function;
        this.listenerName = str5;
        this.sqsMessageClient = sqsMessageClient;
        this.executorService = executorService;
        this.maximumNumberOfMessagesKey = str4;
        this.workerNodeCheck = workerNodeCheckFunc == null ? () -> {
            return true;
        } : workerNodeCheckFunc;
        this.semaphore = semaphore == null ? new Semaphore(BigInteger.ONE.intValue()) : semaphore;
        this.environment = (Environment) ApplicationContextUtils.getInstance().getBean(Environment.class);
        this.listenerEnabled = StringUtils.hasLength(str7) ? ((Boolean) this.environment.getProperty(str7, Boolean.class, true)).booleanValue() : true;
        this.queueName = str;
        this.queueUrl = str2;
        this.queueUrlFunc = StringUtils.hasLength(str2) ? str8 -> {
            return this.queueUrl;
        } : str9 -> {
            return getQueueUrl(sqsMessageClient, str9);
        };
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info("Creating SqsMessageListener: {}, Queue: {}", str5, str);
        }
    }

    private String getQueueUrl(SqsMessageClient sqsMessageClient, String str) {
        if (!StringUtils.hasLength(this.queueUrl)) {
            synchronized (this) {
                if (!StringUtils.hasLength(this.queueUrl)) {
                    this.queueUrl = sqsMessageClient.getQueueUrl(str);
                }
            }
        }
        return this.queueUrl;
    }

    @Override // org.awsutils.sqs.listener.SqsMessageListener
    public void receive() {
        if (!this.listenerEnabled || !this.workerNodeCheck.check()) {
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("Not receiving messages since worker node check returned false");
                return;
            }
            return;
        }
        setRateLimiters();
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug(MessageFormat.format("Receiving messages after starter in listener [{0}]", this.listenerName));
        }
        try {
            processUsingLock();
        } catch (InterruptedException e) {
            Utils.handleInterruptedException(e, () -> {
            });
        }
    }

    private void setRateLimiters() {
        try {
            if (this.rateLimiter == null && StringUtils.hasLength(this.rateLimiterName)) {
                this.rateLimiter = RateLimiterFactory.getInstance().getRateLimiter(this.rateLimiterName);
            }
            if (this.messageHandlerRateLimiter == null && StringUtils.hasLength(this.messageHandlerRateLimiterName)) {
                this.messageHandlerRateLimiter = RateLimiterFactory.getInstance().getRateLimiter(this.messageHandlerRateLimiterName);
            }
        } catch (Exception e) {
            log.error("Exception: {}", e, e);
            if (!(e instanceof RuntimeException)) {
                throw new RuntimeException(e);
            }
        }
    }

    private void processUsingLock() throws InterruptedException {
        Utils.executeUsingSemaphore(this.semaphore, SEMAPHORE_TIMEOUT_IN_SECONDS, TimeUnit.SECONDS, () -> {
            int i = 0;
            boolean z = true;
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug(MessageFormat.format("Checking for messages from SQS in listener [{0}]: {1}", this.listenerName, this.queueUrlFunc.apply(this.queueName)));
            }
            while (z) {
                List<Message> receiveMessages = receiveMessages();
                i += receiveMessages.size();
                z = processSqsMessages(receiveMessages, i);
                if (LOGGER.isDebugEnabled()) {
                    LOGGER.debug(MessageFormat.format("Proceed with receiving messages [{0}]: {1}", this.listenerName, Boolean.valueOf(z)));
                }
            }
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug(MessageFormat.format("Total number of messages received: {0}", Integer.valueOf(i)));
                Logger logger = LOGGER;
                Object[] objArr = new Object[1];
                objArr[0] = this.rateLimiter != null ? this.rateLimiter.getRateLimiterName() : null;
                logger.debug(MessageFormat.format("Rate limiter used: {0}", objArr));
            }
        });
    }

    private boolean processSqsMessages(List<Message> list, int i) {
        boolean z;
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("In processSqsMessages: " + list);
        }
        if (CollectionUtils.isEmpty(list)) {
            if (LOGGER.isDebugEnabled()) {
                Logger logger = LOGGER;
                Object[] objArr = new Object[2];
                objArr[0] = this.listenerName;
                objArr[1] = Integer.valueOf(list != null ? list.size() : 0);
                logger.debug(MessageFormat.format("List of messages is empty in listener [{0}]: {1}", objArr));
            }
            z = false;
        } else {
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("Message list is not empty..");
            }
            list.forEach(this::processSqsMessage);
            z = (!StringUtils.hasLength(this.maximumNumberOfMessagesKey) ? MAXIMUM_NUMBER_OF_MESSAGES : this.propertyReaderFunction.apply(this.maximumNumberOfMessagesKey).intValue()) > i;
        }
        return z;
    }

    private void processSqsMessage(Message message) {
        long currentTimeMillis = System.currentTimeMillis();
        this.sqsMessageClient.changeVisibilitySync(this.queueUrlFunc.apply(this.queueName), message.receiptHandle(), Integer.valueOf((int) CHANGE_VISIBILITY_PERIOD_IN_SECONDS));
        Runnable runnable = () -> {
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("Processing message: " + message.messageId());
            }
            this.executorService.submit(() -> {
                processMessage(message, currentTimeMillis);
            });
        };
        if (this.rateLimiter == null) {
            runnable.run();
            return;
        }
        RateLimiter rateLimiter = this.rateLimiter;
        Objects.requireNonNull(runnable);
        rateLimiter.execute(runnable::run);
    }

    private void processMessage(Message message, long j) {
        SqsMessageHandler messageHandler;
        try {
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug(MessageFormat.format("Processing message in listener[{0}]: {1}", this.listenerName, message));
            }
            if (TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis() - j) < CHANGE_VISIBILITY_PERIOD_IN_SECONDS) {
                String body = message.body();
                String receiptHandle = message.receiptHandle();
                String str = (String) message.attributes().get(MessageSystemAttributeName.APPROXIMATE_RECEIVE_COUNT);
                Map<String, String> map = (Map) message.messageAttributes().entrySet().stream().collect(Collectors.toMap((v0) -> {
                    return v0.getKey();
                }, entry -> {
                    return ((MessageAttributeValue) entry.getValue()).stringValue();
                }));
                String str2 = map.get(MessageConstants.SQS_MESSAGE_WRAPPER_PRESENT);
                if ((StringUtils.hasLength(str2) && "true".equalsIgnoreCase(str2)) || message.body().contains("\"messageType")) {
                    Tuple3<SqsMessage<?>, Map<String, String>, TaskInput<?>> constructSqsMessage = constructSqsMessage(body, receiptHandle);
                    messageHandler = this.messageHandlerFactory.getMessageHandler((SqsMessage) constructSqsMessage._1(), receiptHandle, this.queueUrlFunc.apply(this.queueName), Integer.valueOf(StringUtils.hasLength(str) ? Integer.parseInt(str) : 0), CollectionUtils.isEmpty(map) ? (Map) constructSqsMessage._2() : map, this.messageHandlerRateLimiter);
                    if (LOGGER.isDebugEnabled()) {
                        LOGGER.debug(MessageFormat.format("Handling message by {0}", messageHandler));
                    }
                } else {
                    if (!StringUtils.hasLength(map.get(MessageConstants.MESSAGE_TYPE))) {
                        throw new UtilsException("INVALID_MESSAGE", "The message body should be of SqsMessage type or should contain `messageType` attribute");
                    }
                    messageHandler = this.messageHandlerFactory.getMessageHandler(body, map.get(MessageConstants.MESSAGE_TYPE), map.get(MessageConstants.TRANSACTION_ID), receiptHandle, this.queueUrl, Integer.valueOf(StringUtils.hasLength(str) ? Integer.parseInt(str) : 0), map, this.messageHandlerRateLimiter);
                }
                messageHandler.handle();
            }
        } catch (UtilsException e) {
            handleUtilsException(message, e);
        } catch (Exception e2) {
            LOGGER.error(MessageFormat.format("Exception in listener[{0}]: {1}", this.listenerName, e2.getMessage()), e2);
        }
    }

    private void handleUtilsException(Message message, UtilsException utilsException) {
        if (!"NO_HANDLER_FOR_MESSAGE_TYPE".equalsIgnoreCase(utilsException.getErrorType()) && !"INVALID_JSON".equalsIgnoreCase(utilsException.getErrorType())) {
            LOGGER.error(MessageFormat.format("Exception in listener[{0}]: {1}", this.listenerName, utilsException.getMessage()), utilsException);
        } else {
            LOGGER.error(MessageFormat.format("Exception in listener[{0}]: {1}", this.listenerName, utilsException.getMessage()), utilsException);
            this.sqsMessageClient.deleteMessageSync(this.queueUrlFunc.apply(this.queueName), message.receiptHandle());
        }
    }

    private Tuple3<SqsMessage<?>, Map<String, String>, TaskInput<?>> constructSqsMessage(String str, String str2) {
        return !str.contains("\"Type\" : \"Notification\"") ? Tuple.of((SqsMessage) Utils.constructFromJson(SqsMessage.class, str, th -> {
            return new UtilsException("INVALID_JSON", th);
        }), (Object) null, (Object) null) : processSnsNotification(str, str2);
    }

    private Tuple3<SqsMessage<?>, Map<String, String>, TaskInput<?>> processSnsNotification(String str, String str2) {
        TaskInput taskInput;
        SqsMessage sqsMessage;
        SnsSubscriptionMessage snsSubscriptionMessage = (SnsSubscriptionMessage) Utils.constructFromJson(SnsSubscriptionMessage.class, str);
        String message = snsSubscriptionMessage.getMessage();
        Map<String, MessageAttribute> messageAttributes = snsSubscriptionMessage.getMessageAttributes();
        if (message.contains("\"Input\"")) {
            taskInput = (TaskInput) Utils.constructFromJson(TaskInput.class, message);
            if (taskInput.getInput() == null) {
                this.sqsMessageClient.deleteMessageSync(this.queueName, str2);
                throw new UtilsException("EMPTY_MESSAGE_BODY", "Empty sqs message body");
            }
            sqsMessage = taskInput.getInput();
        } else {
            taskInput = null;
            sqsMessage = (SqsMessage) Utils.constructFromJson(SqsMessage.class, message, th -> {
                return new UtilsException("INVALID_JSON", th);
            });
        }
        return Tuple.of(sqsMessage, CollectionUtils.isEmpty(messageAttributes) ? null : (Map) messageAttributes.entrySet().stream().collect(Collectors.toMap((v0) -> {
            return v0.getKey();
        }, entry -> {
            return ((MessageAttribute) entry.getValue()).getValue();
        })), taskInput);
    }

    private List<Message> receiveMessages() {
        return this.sqsSyncClient.receiveMessage((ReceiveMessageRequest) ReceiveMessageRequest.builder().queueUrl(this.queueUrlFunc.apply(this.queueName)).attributeNames(new QueueAttributeName[]{QueueAttributeName.ALL}).messageAttributeNames(new String[]{"All"}).maxNumberOfMessages(MAX_NUMBER_OF_SQS_MESSAGES).waitTimeSeconds(this.waitTimeInSeconds).build()).messages();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static SqsMessageListener.Builder builder() {
        return new SqsMessageListenerBuilder();
    }
}
