package org.awsutils.sqs.listener;

import io.vavr.Tuple;
import io.vavr.Tuple3;
import java.math.BigInteger;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Timer;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.awsutils.common.exceptions.UtilsException;
import org.awsutils.common.ratelimiter.RateLimiter;
import org.awsutils.common.ratelimiter.RateLimiterFactory;
import org.awsutils.common.util.ApplicationContextUtils;
import org.awsutils.common.util.Utils;
import org.awsutils.sqs.client.MessageConstants;
import org.awsutils.sqs.client.SyncSqsMessageClient;
import org.awsutils.sqs.config.WorkerNodeCheckFunc;
import org.awsutils.sqs.handler.MessageHandlerFactory;
import org.awsutils.sqs.handler.SqsMessageHandler;
import org.awsutils.sqs.listener.SqsMessageListener;
import org.awsutils.sqs.message.MessageAttribute;
import org.awsutils.sqs.message.SnsSubscriptionMessage;
import org.awsutils.sqs.message.SqsMessage;
import org.awsutils.sqs.message.TaskInput;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.core.env.Environment;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;
import software.amazon.awssdk.services.sqs.SqsClient;
import software.amazon.awssdk.services.sqs.model.Message;
import software.amazon.awssdk.services.sqs.model.MessageAttributeValue;
import software.amazon.awssdk.services.sqs.model.MessageSystemAttributeName;
import software.amazon.awssdk.services.sqs.model.QueueAttributeName;
import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest;

/* loaded from: input_file:org/awsutils/sqs/listener/SqsMessageListenerImpl.class */
final class SqsMessageListenerImpl implements SqsMessageListener {
    private final Environment environment;
    private String queueUrl;
    private final SqsClient sqsSyncClient;
    private final MessageHandlerFactory messageHandlerFactory;
    private final SyncSqsMessageClient syncSqsMessageClient;
    private final String rateLimiterName;
    private final ExecutorService executorService;
    private final String maximumNumberOfMessagesKey;
    private final Function<String, Integer> propertyReaderFunction;
    private final WorkerNodeCheckFunc workerNodeCheck;
    private final Semaphore semaphore;
    private final String listenerName;
    private final String messageHandlerRateLimiterName;
    private RateLimiter rateLimiter = new PassthroughRateLimiter();
    private RateLimiter messageHandlerRateLimiter = new PassthroughRateLimiter();
    private final boolean listenerEnabled;
    private final Integer waitTimeInSeconds;
    private static final int MAXIMUM_NUMBER_OF_MESSAGES = 2000;
    private static final Logger log = LoggerFactory.getLogger(SqsMessageListenerImpl.class);
    private static final Integer MAX_NUMBER_OF_SQS_MESSAGES = 10;
    private static final long SEMAPHORE_TIMEOUT_IN_SECONDS = 15;
    private static final long CHANGE_VISIBILITY_PERIOD_IN_SECONDS = TimeUnit.MINUTES.toSeconds(SEMAPHORE_TIMEOUT_IN_SECONDS);
    private static final Thread SHUTDOWN_HOOK = new Thread();

    /* JADX INFO: Access modifiers changed from: package-private */
    public SqsMessageListenerImpl(String str, SqsClient sqsClient, MessageHandlerFactory messageHandlerFactory, SyncSqsMessageClient syncSqsMessageClient, ExecutorService executorService, String str2, String str3, Semaphore semaphore, Function<String, Integer> function, WorkerNodeCheckFunc workerNodeCheckFunc, String str4, String str5, String str6, Integer num) {
        Timer timer = new Timer();
        this.sqsSyncClient = sqsClient;
        this.syncSqsMessageClient = syncSqsMessageClient;
        this.rateLimiterName = str2;
        this.messageHandlerRateLimiterName = str5;
        this.waitTimeInSeconds = num;
        this.messageHandlerFactory = messageHandlerFactory;
        this.propertyReaderFunction = function;
        this.listenerName = str4;
        this.executorService = executorService;
        this.maximumNumberOfMessagesKey = str3;
        this.workerNodeCheck = workerNodeCheckFunc == null ? () -> {
            return true;
        } : workerNodeCheckFunc;
        this.semaphore = semaphore == null ? new Semaphore(BigInteger.ONE.intValue()) : semaphore;
        this.environment = (Environment) ApplicationContextUtils.getInstance().getBean(Environment.class);
        this.listenerEnabled = StringUtils.hasLength(str6) ? ((Boolean) this.environment.getProperty(str6, Boolean.class, true)).booleanValue() : true;
        this.queueUrl = str;
        if (!StringUtils.hasLength(str)) {
            throw new IllegalStateException("QueueUrl is required");
        }
        if (log.isInfoEnabled()) {
            log.info("Creating SqsMessageListener: {}, Queue: {}", str4, str);
        }
        timer.schedule(new DefaultTimerTask(this::setRateLimiters), 10000L);
    }

    private String getQueueUrl(SyncSqsMessageClient syncSqsMessageClient, String str) {
        if (!StringUtils.hasLength(this.queueUrl)) {
            synchronized (this) {
                if (!StringUtils.hasLength(this.queueUrl)) {
                    this.queueUrl = syncSqsMessageClient.getQueueUrl(str);
                }
            }
        }
        return this.queueUrl;
    }

    @Override // org.awsutils.sqs.listener.SqsMessageListener
    public void receive() {
        try {
            if (this.listenerEnabled && this.workerNodeCheck.check()) {
                log.debug("Receiving messages after starter in listener [{}]", this.listenerName);
                processUsingLock();
            } else {
                log.debug("Not receiving messages since worker node check returned false");
            }
        } catch (InterruptedException e) {
            Utils.handleInterruptedException(e, () -> {
            });
        }
    }

    private void setRateLimiters() {
        try {
            if (StringUtils.hasLength(this.rateLimiterName)) {
                this.rateLimiter = RateLimiterFactory.getInstance().getRateLimiter(this.rateLimiterName);
            }
            if (StringUtils.hasLength(this.messageHandlerRateLimiterName)) {
                this.messageHandlerRateLimiter = RateLimiterFactory.getInstance().getRateLimiter(this.messageHandlerRateLimiterName);
            }
        } catch (Exception e) {
            log.error("Exception: {}", e, e);
            if (!(e instanceof RuntimeException)) {
                throw new RuntimeException(e);
            }
        }
    }

    private void processUsingLock() throws InterruptedException {
        Utils.executeUsingSemaphore(this.semaphore, SEMAPHORE_TIMEOUT_IN_SECONDS, TimeUnit.SECONDS, () -> {
            int i = 0;
            boolean z = true;
            log.debug("Checking for messages from SQS in listener [{}]: {}", this.listenerName, this.queueUrl);
            while (z) {
                List<Message> receiveMessages = receiveMessages();
                i += receiveMessages.size();
                z = processSqsMessages(receiveMessages, i);
                log.debug("Proceed with receiving messages [{}]: {}", this.listenerName, Boolean.valueOf(z));
            }
            if (log.isDebugEnabled()) {
                log.debug("Total number of messages received: {}", Integer.valueOf(i));
                log.debug("Rate limiter used: {}", this.rateLimiter != null ? this.rateLimiter.getRateLimiterName() : null);
            }
        });
    }

    private boolean processSqsMessages(List<Message> list, int i) {
        boolean z;
        log.debug("In processSqsMessages: " + list);
        if (CollectionUtils.isEmpty(list)) {
            log.debug("List of messages is empty in listener [{}]: {}", this.listenerName, 0);
            z = false;
        } else {
            log.debug("Message list is not empty..");
            list.forEach(this::processSqsMessage);
            z = (!StringUtils.hasLength(this.maximumNumberOfMessagesKey) ? MAXIMUM_NUMBER_OF_MESSAGES : this.propertyReaderFunction.apply(this.maximumNumberOfMessagesKey).intValue()) > i;
        }
        return z;
    }

    private void processSqsMessage(Message message) {
        long currentTimeMillis = System.currentTimeMillis();
        this.syncSqsMessageClient.changeVisibility(this.queueUrl, message.receiptHandle(), Integer.valueOf((int) CHANGE_VISIBILITY_PERIOD_IN_SECONDS));
        Runnable runnable = () -> {
            log.debug("Processing message: " + message.messageId());
            this.executorService.submit(() -> {
                processMessage(message, currentTimeMillis);
            });
        };
        if (this.rateLimiter == null) {
            runnable.run();
            return;
        }
        RateLimiter rateLimiter = this.rateLimiter;
        Objects.requireNonNull(runnable);
        rateLimiter.execute(runnable::run);
    }

    private void processMessage(Message message, long j) {
        SqsMessageHandler messageHandler;
        try {
            log.debug("Processing message in listener[{}]: {}", this.listenerName, message);
            if (TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis() - j) < CHANGE_VISIBILITY_PERIOD_IN_SECONDS) {
                String body = message.body();
                String receiptHandle = message.receiptHandle();
                String str = (String) message.attributes().get(MessageSystemAttributeName.APPROXIMATE_RECEIVE_COUNT);
                Map<String, String> map = (Map) message.messageAttributes().entrySet().stream().collect(Collectors.toMap((v0) -> {
                    return v0.getKey();
                }, entry -> {
                    return ((MessageAttributeValue) entry.getValue()).stringValue();
                }));
                String str2 = map.get(MessageConstants.SQS_MESSAGE_WRAPPER_PRESENT);
                if ((StringUtils.hasLength(str2) && "true".equalsIgnoreCase(str2)) || message.body().contains("\"messageType")) {
                    Tuple3<SqsMessage<?>, Map<String, String>, TaskInput<?>> constructSqsMessage = constructSqsMessage(body, receiptHandle);
                    messageHandler = this.messageHandlerFactory.getMessageHandler((SqsMessage) constructSqsMessage._1(), receiptHandle, this.queueUrl, Integer.valueOf(StringUtils.hasLength(str) ? Integer.parseInt(str) : 0), CollectionUtils.isEmpty(map) ? (Map) constructSqsMessage._2() : map, this.messageHandlerRateLimiter);
                    log.debug("Handling message by {}", messageHandler);
                } else {
                    if (!StringUtils.hasLength(map.get(MessageConstants.MESSAGE_TYPE))) {
                        throw new UtilsException("INVALID_MESSAGE", "The message body should be of SqsMessage type or should contain `messageType` attribute");
                    }
                    messageHandler = this.messageHandlerFactory.getMessageHandler(body, map.get(MessageConstants.MESSAGE_TYPE), map.get(MessageConstants.TRANSACTION_ID), receiptHandle, this.queueUrl, Integer.valueOf(StringUtils.hasLength(str) ? Integer.parseInt(str) : 0), map, this.messageHandlerRateLimiter);
                }
                messageHandler.handle();
            }
        } catch (Exception e) {
            log.error("Exception in listener[{}]: {}", new Object[]{this.listenerName, e.getMessage(), e});
        } catch (UtilsException e2) {
            handleUtilsException(message, e2);
        }
    }

    private void handleUtilsException(Message message, UtilsException utilsException) {
        if (!"NO_HANDLER_FOR_MESSAGE_TYPE".equalsIgnoreCase(utilsException.getErrorType()) && !"INVALID_JSON".equalsIgnoreCase(utilsException.getErrorType())) {
            log.error("Exception in listener[{}]: {}", new Object[]{this.listenerName, utilsException.getMessage(), utilsException});
        } else {
            log.error("Exception in listener[{}]: {}", new Object[]{this.listenerName, utilsException.getMessage(), utilsException});
            this.syncSqsMessageClient.deleteMessage(this.queueUrl, message.receiptHandle());
        }
    }

    private Tuple3<SqsMessage<?>, Map<String, String>, TaskInput<?>> constructSqsMessage(String str, String str2) {
        return !str.contains("\"Type\" : \"Notification\"") ? Tuple.of((SqsMessage) Utils.constructFromJson(SqsMessage.class, str, th -> {
            return new UtilsException("INVALID_JSON", th);
        }), (Object) null, (Object) null) : processSnsNotification(str, str2);
    }

    private Tuple3<SqsMessage<?>, Map<String, String>, TaskInput<?>> processSnsNotification(String str, String str2) {
        TaskInput taskInput;
        SqsMessage sqsMessage;
        SnsSubscriptionMessage snsSubscriptionMessage = (SnsSubscriptionMessage) Utils.constructFromJson(SnsSubscriptionMessage.class, str);
        String message = snsSubscriptionMessage.getMessage();
        Map<String, MessageAttribute> messageAttributes = snsSubscriptionMessage.getMessageAttributes();
        if (message.contains("\"Input\"")) {
            taskInput = (TaskInput) Utils.constructFromJson(TaskInput.class, message);
            if (taskInput.getInput() == null) {
                this.syncSqsMessageClient.deleteMessage(this.queueUrl, str2);
                throw new UtilsException("EMPTY_MESSAGE_BODY", "Empty sqs message body");
            }
            sqsMessage = taskInput.getInput();
        } else {
            taskInput = null;
            sqsMessage = (SqsMessage) Utils.constructFromJson(SqsMessage.class, message, th -> {
                return new UtilsException("INVALID_JSON", th);
            });
        }
        return Tuple.of(sqsMessage, CollectionUtils.isEmpty(messageAttributes) ? null : (Map) messageAttributes.entrySet().stream().collect(Collectors.toMap((v0) -> {
            return v0.getKey();
        }, entry -> {
            return ((MessageAttribute) entry.getValue()).getValue();
        })), taskInput);
    }

    private List<Message> receiveMessages() {
        return this.sqsSyncClient.receiveMessage((ReceiveMessageRequest) ReceiveMessageRequest.builder().queueUrl(this.queueUrl).attributeNames(new QueueAttributeName[]{QueueAttributeName.ALL}).messageAttributeNames(new String[]{"All"}).maxNumberOfMessages(MAX_NUMBER_OF_SQS_MESSAGES).waitTimeSeconds(this.waitTimeInSeconds).build()).messages();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static SqsMessageListener.Builder builder() {
        return new SqsMessageListenerBuilder();
    }
}
