package io.pythagoras.messagebus.adapter.awssnssqs;

import com.amazonaws.services.sns.AmazonSNS;
import com.amazonaws.services.sns.model.AuthorizationErrorException;
import com.amazonaws.services.sns.model.EndpointDisabledException;
import com.amazonaws.services.sns.model.InternalErrorException;
import com.amazonaws.services.sns.model.InvalidParameterException;
import com.amazonaws.services.sns.model.InvalidParameterValueException;
import com.amazonaws.services.sns.model.MessageAttributeValue;
import com.amazonaws.services.sns.model.NotFoundException;
import com.amazonaws.services.sns.model.PlatformApplicationDisabledException;
import com.amazonaws.services.sns.model.PublishRequest;
import com.amazonaws.services.sqs.AmazonSQS;
import com.amazonaws.services.sqs.model.DeleteQueueRequest;
import io.pythagoras.messagebus.adapter.awssnssqs.config.Properties;
import io.pythagoras.messagebus.adapter.awssnssqs.exceptions.SNSNotExistsException;
import io.pythagoras.messagebus.annotations.MessageBusAdapter;
import io.pythagoras.messagebus.core.BaseRunTimeException;
import io.pythagoras.messagebus.core.IBusMessage;
import io.pythagoras.messagebus.core.IMessageBus;
import io.pythagoras.messagebus.core.IMessageBusAdapter;
import io.pythagoras.messagebus.core.IMessageContract;
import io.pythagoras.messagebus.core.MessageBusInitializationException;
import io.pythagoras.messagebus.core.MessageBusStateException;
import io.pythagoras.messagebus.core.MessageSendingException;
import io.pythagoras.messagebus.core.config.MessageBusProperties;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Required;
import org.springframework.beans.factory.annotation.Value;

@MessageBusAdapter
/* loaded from: input_file:io/pythagoras/messagebus/adapter/awssnssqs/AwsSnsSqsAdapter.class */
public class AwsSnsSqsAdapter implements IMessageBusAdapter {
    private static final Logger logger = LoggerFactory.getLogger(AwsSnsSqsAdapter.class);
    private AmazonSQS sqs;
    private SQSClientProvider sqsClientProvider;
    private AmazonSNS sns;
    private SNSRegistry snsRegistry;
    private Properties properties;
    private String appName;
    private String queueUrl;
    private Map<String, String> codeToDedicatedQueuePerServiceUrl;
    private Map<String, String> codeToDedicatedQueuePerInstanceUrl;
    private MessageBusProperties messageBusProperties;
    private List<SqsConsumer> sqsConsumers = new ArrayList();

    @Autowired
    public void setSQS(SQSClientProvider sQSClientProvider) {
        this.sqsClientProvider = sQSClientProvider;
        this.sqs = sQSClientProvider.getSyncClient();
    }

    @Autowired
    public void setSNS(SNSClientProvider sNSClientProvider) {
        this.sns = sNSClientProvider.getSyncClient();
    }

    @Autowired
    public void setSnsRegistry(SNSRegistry sNSRegistry) {
        this.snsRegistry = sNSRegistry;
    }

    @Autowired
    public void setProperties(Properties properties) {
        this.properties = properties;
    }

    @Autowired
    public void setMessageBusProperties(MessageBusProperties messageBusProperties) {
        this.messageBusProperties = messageBusProperties;
    }

    @Required
    @Value("${spring.application.name}")
    public void setAppName(String str) {
        this.appName = str;
    }

    public void initialize(List<String> list, List<String> list2, Map<String, List<Class<IMessageContract>>> map) throws MessageBusInitializationException {
        if (this.messageBusProperties.isEnabled()) {
            Initializer initializer = new Initializer(this.snsRegistry, this.properties, this.sns, this.sqs);
            initializer.initialize(list, list2, map, this.appName);
            this.queueUrl = initializer.getQueueUrl();
            this.codeToDedicatedQueuePerServiceUrl = initializer.getCodeToDedicatedQueuePerServiceUrl();
            this.codeToDedicatedQueuePerInstanceUrl = initializer.getCodeToDedicatedQueuePerInstanceUrl();
        }
    }

    public void sendMessage(IBusMessage iBusMessage) throws MessageSendingException {
        if (!this.messageBusProperties.isEnabled()) {
            throw new MessageSendingException("Message Bus is Disabled.  Unable to send messages.");
        }
        try {
            String arn = this.snsRegistry.getARN(iBusMessage.getCode());
            PublishRequest publishRequest = new PublishRequest();
            publishRequest.setTopicArn(arn);
            publishRequest.setMessage(iBusMessage.getPayload());
            HashMap hashMap = new HashMap();
            hashMap.put("MessageCode", createAttributeValue(iBusMessage.getCode()));
            hashMap.put("MessageVersion", createAttributeValue(iBusMessage.getVersion().toString()));
            publishRequest.setMessageAttributes(hashMap);
            try {
                this.sns.publish(publishRequest);
            } catch (InvalidParameterException | InvalidParameterValueException | InternalErrorException | NotFoundException | EndpointDisabledException | PlatformApplicationDisabledException | AuthorizationErrorException e) {
                throw new MessageSendingException("Unable to send message.", e);
            }
        } catch (SNSNotExistsException e2) {
            throw new MessageSendingException("Unable to locate exchange. ", e2);
        }
    }

    public List<SqsConsumer> getConsumers() {
        return this.sqsConsumers;
    }

    private MessageAttributeValue createAttributeValue(String str) {
        MessageAttributeValue messageAttributeValue = new MessageAttributeValue();
        messageAttributeValue.setDataType("String");
        messageAttributeValue.setStringValue(str);
        return messageAttributeValue;
    }

    public void registerMessageBusService(IMessageBus iMessageBus) {
        if (this.messageBusProperties.isEnabled() && this.messageBusProperties.isReceiveEnabled()) {
            if (this.queueUrl == null && this.codeToDedicatedQueuePerServiceUrl.isEmpty() && this.codeToDedicatedQueuePerInstanceUrl.isEmpty()) {
                throw new BaseRunTimeException("Adapter must be initialized prior to registration of message handler.");
            }
            if (this.queueUrl != null) {
                SqsConsumer sqsConsumer = new SqsConsumer(this.sqsClientProvider, iMessageBus, this.queueUrl);
                sqsConsumer.setParallelReceivers(this.properties.getParallelReceivers());
                this.sqsConsumers.add(sqsConsumer);
            }
            Iterator<Map.Entry<String, String>> it = this.codeToDedicatedQueuePerServiceUrl.entrySet().iterator();
            while (it.hasNext()) {
                SqsConsumer sqsConsumer2 = new SqsConsumer(this.sqsClientProvider, iMessageBus, it.next().getValue());
                sqsConsumer2.setParallelReceivers(this.properties.getParallelReceivers());
                this.sqsConsumers.add(sqsConsumer2);
            }
            Iterator<Map.Entry<String, String>> it2 = this.codeToDedicatedQueuePerInstanceUrl.entrySet().iterator();
            while (it2.hasNext()) {
                SqsConsumer sqsConsumer3 = new SqsConsumer(this.sqsClientProvider, iMessageBus, it2.next().getValue());
                sqsConsumer3.setParallelReceivers(this.properties.getParallelReceivers());
                this.sqsConsumers.add(sqsConsumer3);
            }
        }
    }

    public void stop() throws MessageBusStateException {
        for (SqsConsumer sqsConsumer : this.sqsConsumers) {
            if (sqsConsumer != null) {
                sqsConsumer.stop();
            }
        }
    }

    public void start() throws MessageBusStateException {
        for (SqsConsumer sqsConsumer : this.sqsConsumers) {
            if (sqsConsumer == null) {
                throw new MessageBusStateException("Consumer is null.");
            }
            sqsConsumer.start();
        }
    }

    public void cleanResources() throws MessageBusStateException {
        logger.info("Starting removal of dedicated queues per instance.");
        try {
            ArrayList<String> arrayList = new ArrayList();
            Iterator<Map.Entry<String, String>> it = this.codeToDedicatedQueuePerInstanceUrl.entrySet().iterator();
            while (it.hasNext()) {
                arrayList.add(it.next().getValue());
            }
            logger.info(String.format("Number of queues to be deleted: %s.", Integer.valueOf(arrayList.size())));
            arrayList.forEach(str -> {
                logger.info(String.format("Queue url for removal: %s.", str));
            });
            for (String str2 : arrayList) {
                logger.info(String.format("Deleting queue: %s.", str2));
                this.sqs.deleteQueue(new DeleteQueueRequest(str2));
            }
            logger.info("Removal of dedicated queues per instance completed without exceptions.");
        } catch (RuntimeException e) {
            logger.info("Removal of dedicated queues per instance completed with exception.", e);
        }
    }
}
