package io.basestar.event.sqs;

import com.google.common.io.BaseEncoding;
import io.basestar.event.Event;
import io.basestar.event.EventSerialization;
import io.basestar.event.Handler;
import io.basestar.event.Receiver;
import io.basestar.storage.Stash;
import io.basestar.util.Nullsafe;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.services.sqs.SqsAsyncClient;
import software.amazon.awssdk.services.sqs.model.DeleteMessageRequest;
import software.amazon.awssdk.services.sqs.model.Message;
import software.amazon.awssdk.services.sqs.model.MessageAttributeValue;
import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest;
import software.amazon.awssdk.services.sqs.model.ReceiveMessageResponse;

/* loaded from: input_file:io/basestar/event/sqs/SQSReceiver.class */
public class SQSReceiver implements Receiver {
    public static final String EVENT_ATTRIBUTE = "event";
    public static final String OVERSIZE_ATTRIBUTE = "oversize";
    public static final String ALL_ATTRIBUTES = "All";
    private static final int WAIT_SECONDS = 20;
    private static final int READ_COUNT = 10;
    private final SqsAsyncClient client;
    private final String queueUrl;
    private final EventSerialization serialization;
    private final Stash oversizeStash;
    private final boolean deleteOversize;
    private static final Logger log = LoggerFactory.getLogger(SQSReceiver.class);
    private static final BaseEncoding BASE_ENCODING = BaseEncoding.base64();

    /* loaded from: input_file:io/basestar/event/sqs/SQSReceiver$Builder.class */
    public static class Builder {
        private SqsAsyncClient client;
        private String queueUrl;
        private EventSerialization serialization;
        private Stash oversizeStash;
        private Boolean deleteOversize;

        public SQSReceiver build() {
            return new SQSReceiver(this);
        }

        public Builder setClient(SqsAsyncClient sqsAsyncClient) {
            this.client = sqsAsyncClient;
            return this;
        }

        public Builder setQueueUrl(String str) {
            this.queueUrl = str;
            return this;
        }

        public Builder setSerialization(EventSerialization eventSerialization) {
            this.serialization = eventSerialization;
            return this;
        }

        public Builder setOversizeStash(Stash stash) {
            this.oversizeStash = stash;
            return this;
        }

        public Builder setDeleteOversize(Boolean bool) {
            this.deleteOversize = bool;
            return this;
        }
    }

    public SQSReceiver(Builder builder) {
        this.client = builder.client;
        this.queueUrl = builder.queueUrl;
        this.serialization = (EventSerialization) Nullsafe.orDefault(builder.serialization, EventSerialization.gzipBson());
        this.oversizeStash = builder.oversizeStash;
        this.deleteOversize = ((Boolean) Nullsafe.orDefault(builder.deleteOversize, true)).booleanValue();
    }

    public static Builder builder() {
        return new Builder();
    }

    public CompletableFuture<Integer> receive(Handler<Event> handler) {
        return this.client.receiveMessage((ReceiveMessageRequest) ReceiveMessageRequest.builder().waitTimeSeconds(Integer.valueOf(WAIT_SECONDS)).maxNumberOfMessages(Integer.valueOf(READ_COUNT)).queueUrl(this.queueUrl).messageAttributeNames(new String[]{ALL_ATTRIBUTES}).build()).thenCompose(receiveMessageResponse -> {
            return handle(receiveMessageResponse, (Handler<Event>) handler);
        });
    }

    private CompletableFuture<Integer> handle(ReceiveMessageResponse receiveMessageResponse, Handler<Event> handler) {
        return receiveMessageResponse.messages().isEmpty() ? CompletableFuture.completedFuture(0) : CompletableFuture.allOf((CompletableFuture[]) receiveMessageResponse.messages().stream().map(message -> {
            return handle(message, (Handler<Event>) handler);
        }).toArray(i -> {
            return new CompletableFuture[i];
        })).thenApply(r3 -> {
            return Integer.valueOf(receiveMessageResponse.messages().size());
        });
    }

    private CompletableFuture<?> handle(Message message, Handler<Event> handler) {
        Map messageAttributes = message.messageAttributes();
        String stringValue = ((MessageAttributeValue) messageAttributes.get(EVENT_ATTRIBUTE)).stringValue();
        HashMap hashMap = new HashMap();
        messageAttributes.forEach((str, messageAttributeValue) -> {
            if ("String".equals(messageAttributeValue.dataType())) {
                hashMap.put(str, messageAttributeValue.stringValue());
            }
        });
        try {
            Class<?> cls = Class.forName(stringValue);
            if (!Event.class.isAssignableFrom(cls)) {
                throw new IllegalStateException();
            }
            MessageAttributeValue messageAttributeValue2 = (MessageAttributeValue) messageAttributes.get(OVERSIZE_ATTRIBUTE);
            if (messageAttributeValue2 != null) {
                String stringValue2 = messageAttributeValue2.stringValue();
                return this.oversizeStash.read(stringValue2).thenCompose(bArr -> {
                    CompletableFuture<?> handle = handle(message, this.serialization.deserialize(cls, bArr), hashMap, handler);
                    return this.deleteOversize ? handle.thenCompose(obj -> {
                        return this.oversizeStash.delete(stringValue2);
                    }) : handle.thenApply(obj2 -> {
                        return null;
                    });
                });
            }
            return handle(message, this.serialization.deserialize(cls, BASE_ENCODING.decode(message.body())), hashMap, handler);
        } catch (ClassNotFoundException e) {
            throw new IllegalStateException(e);
        }
    }

    private CompletableFuture<?> handle(Message message, Event event, Map<String, String> map, Handler<Event> handler) {
        return handler.handle(event, map).thenCompose(obj -> {
            return this.client.deleteMessage((DeleteMessageRequest) DeleteMessageRequest.builder().queueUrl(this.queueUrl).receiptHandle(message.receiptHandle()).build());
        });
    }
}
