package io.pythagoras.messagebus.adapter.awssnssqs;

import com.amazonaws.services.sqs.AmazonSQS;
import com.amazonaws.services.sqs.model.DeleteMessageRequest;
import com.amazonaws.services.sqs.model.Message;
import com.amazonaws.services.sqs.model.ReceiveMessageRequest;
import com.amazonaws.services.sqs.model.ReceiveMessageResult;
import com.google.gson.Gson;
import io.pythagoras.messagebus.core.BaseRunTimeException;
import io.pythagoras.messagebus.core.HandleMessageFailureException;
import io.pythagoras.messagebus.core.IMessageBus;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Collection;
import java.util.Date;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/pythagoras/messagebus/adapter/awssnssqs/SqsConsumer.class */
public class SqsConsumer {
    private static final Logger logger = LoggerFactory.getLogger(SqsConsumer.class);
    private AmazonSQS sqs;
    private IMessageBus bus;
    private String queueUrl;
    private Integer parallelReceivers = 1;
    private boolean started = false;
    private ScheduledExecutorService executor;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/pythagoras/messagebus/adapter/awssnssqs/SqsConsumer$SQSMessage.class */
    public class SQSMessage {
        String MessageId;
        String Type;
        String TopicArn;
        String Message;
        String Timestamp;
        SQSMessageAttributes MessageAttributes;

        SQSMessage() {
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/pythagoras/messagebus/adapter/awssnssqs/SqsConsumer$SQSMessageAttributeValue.class */
    public class SQSMessageAttributeValue {
        String Type;
        String Value;

        SQSMessageAttributeValue() {
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/pythagoras/messagebus/adapter/awssnssqs/SqsConsumer$SQSMessageAttributes.class */
    public class SQSMessageAttributes {
        SQSMessageAttributeValue MessageCode;
        SQSMessageAttributeValue MessageVersion;

        SQSMessageAttributes() {
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public SqsConsumer(SQSClientProvider sQSClientProvider, IMessageBus iMessageBus, String str) {
        this.sqs = sQSClientProvider.getSyncClient();
        this.bus = iMessageBus;
        this.queueUrl = str;
    }

    public void setParallelReceivers(Integer num) {
        if (this.started) {
            throw new BaseRunTimeException("Cannot change number of parallel receivers once SQSConsumer is started.");
        }
        if (num.intValue() > 20) {
            throw new BaseRunTimeException("Lets not be crazy. " + num + " parallel receivers is a lot.  Keep it to 20 or less.");
        }
        this.parallelReceivers = num;
    }

    public void start() {
        if (this.started) {
            return;
        }
        this.started = true;
        this.executor = Executors.newScheduledThreadPool(this.parallelReceivers.intValue());
        Runnable runnable = () -> {
            receiveMessages();
        };
        for (int i = 0; i < this.parallelReceivers.intValue(); i++) {
            this.executor.scheduleWithFixedDelay(runnable, 0L, 1L, TimeUnit.MILLISECONDS);
        }
    }

    public void stop() {
        if (this.started) {
            if (this.executor != null) {
                this.executor.shutdown();
            }
            this.started = false;
        }
    }

    private void receiveMessages() {
        ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest();
        receiveMessageRequest.setQueueUrl(this.queueUrl);
        receiveMessageRequest.setMaxNumberOfMessages(1);
        receiveMessageRequest.setWaitTimeSeconds(20);
        processReceiveMessageResult(this.sqs.receiveMessage(receiveMessageRequest));
    }

    public void injectMessages(Collection<Message> collection) {
        ReceiveMessageResult receiveMessageResult = new ReceiveMessageResult();
        receiveMessageResult.setMessages(collection);
        processReceiveMessageResult(receiveMessageResult);
    }

    private void processReceiveMessageResult(ReceiveMessageResult receiveMessageResult) {
        for (Message message : receiveMessageResult.getMessages()) {
            try {
                handleMessage(message);
                this.sqs.deleteMessage(new DeleteMessageRequest(this.queueUrl, message.getReceiptHandle()));
            } catch (Exception e) {
                logger.error("Failure to handle message in receive. " + message.getBody(), e);
            }
        }
    }

    private void handleMessage(Message message) {
        SQSMessage sQSMessage = (SQSMessage) new Gson().fromJson(message.getBody(), SQSMessage.class);
        if (sQSMessage == null) {
            throw new HandleMessageFailureException("Unable to unwrap message.  Consumption is stopped.");
        }
        if (sQSMessage.MessageAttributes == null || sQSMessage.MessageAttributes.MessageCode == null || sQSMessage.MessageAttributes.MessageCode.Value == null) {
            throw new HandleMessageFailureException("Unable to extract MessageCode.  Consumption is stopped.");
        }
        if (sQSMessage.MessageAttributes.MessageVersion == null || sQSMessage.MessageAttributes.MessageVersion.Value == null) {
            throw new HandleMessageFailureException("Unable to extract MessageVersion.  Consumption is stopped.");
        }
        String str = sQSMessage.MessageAttributes.MessageCode.Value;
        Integer valueOf = Integer.valueOf(sQSMessage.MessageAttributes.MessageVersion.Value);
        BusMessage busMessage = new BusMessage();
        busMessage.setCode(str);
        busMessage.setVersion(valueOf);
        busMessage.setPayload(sQSMessage.Message);
        busMessage.setSentTime(formatTimestamp(sQSMessage.Timestamp));
        this.bus.receiveMessage(busMessage);
    }

    static Date formatTimestamp(String str) {
        try {
            return new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'").parse(str);
        } catch (ParseException e) {
            throw new HandleMessageFailureException("Unable to parse timestamp: " + str, e);
        }
    }
}
