package org.restheart.mongodb.handlers.changestreams;

import io.undertow.Handlers;
import io.undertow.server.HttpHandler;
import io.undertow.server.HttpServerExchange;
import io.undertow.util.AttachmentKey;
import io.undertow.util.HeaderValues;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import org.bson.BsonDocument;
import org.bson.json.JsonMode;
import org.restheart.exchange.InvalidMetadataException;
import org.restheart.exchange.MongoRequest;
import org.restheart.exchange.MongoResponse;
import org.restheart.exchange.QueryNotFoundException;
import org.restheart.exchange.QueryVariableNotBoundException;
import org.restheart.handlers.PipelinedHandler;
import org.restheart.mongodb.utils.StagesInterpolator;
import org.restheart.mongodb.utils.VarsInterpolator;
import org.restheart.utils.ThreadsUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/restheart/mongodb/handlers/changestreams/GetChangeStreamHandler.class */
public class GetChangeStreamHandler extends PipelinedHandler {
    private final String CONNECTION_HEADER_KEY = "connection";
    private final String CONNECTION_HEADER_VALUE = "upgrade";
    private final String UPGRADE_HEADER_KEY = "upgrade";
    private final String UPGRADE_HEADER_VALUE = "websocket";
    private static final Logger LOGGER = LoggerFactory.getLogger(GetChangeStreamHandler.class);
    private static final HttpHandler WEBSOCKET_HANDLER = Handlers.websocket((webSocketHttpExchange, webSocketChannel) -> {
        ChangeStreamWorkerKey changeStreamWorkerKey = new ChangeStreamWorkerKey(webSocketHttpExchange);
        Optional<ChangeStreamWorker> optional = ChangeStreamWorkers.getInstance().get(changeStreamWorkerKey);
        if (!optional.isPresent()) {
            LOGGER.error("Cannot find Change Stream Worker changeStreamKey={}", changeStreamWorkerKey);
            try {
                webSocketChannel.close();
            } catch (IOException e) {
            }
        } else {
            ChangeStreamWorker changeStreamWorker = optional.get();
            WebSocketSession webSocketSession = new WebSocketSession(webSocketChannel, changeStreamWorker);
            changeStreamWorker.websocketSessions().add(webSocketSession);
            LOGGER.debug("New Change Stream WebSocket session, sessionkey={} for changeStreamKey={}", webSocketSession.getId(), changeStreamWorkerKey);
        }
    });
    public static final AttachmentKey<BsonDocument> AVARS_ATTACHMENT_KEY = AttachmentKey.create(BsonDocument.class);
    public static final AttachmentKey<JsonMode> JSON_MODE_ATTACHMENT_KEY = AttachmentKey.create(JsonMode.class);

    public void handleRequest(HttpServerExchange httpServerExchange) throws Exception {
        MongoRequest of = MongoRequest.of(httpServerExchange);
        MongoResponse of2 = MongoResponse.of(httpServerExchange);
        if (of.isInError()) {
            next(httpServerExchange);
            return;
        }
        try {
            if (isWebSocketHandshakeRequest(httpServerExchange)) {
                httpServerExchange.putAttachment(JSON_MODE_ATTACHMENT_KEY, of.getJsonMode());
                BsonDocument aggregationVars = of.getAggregationVars();
                if (aggregationVars == null) {
                    aggregationVars = new BsonDocument();
                }
                StagesInterpolator.injectAvars(of, aggregationVars);
                httpServerExchange.putAttachment(AVARS_ATTACHMENT_KEY, aggregationVars);
                initChangeStreamWorker(httpServerExchange);
                WEBSOCKET_HANDLER.handleRequest(httpServerExchange);
            } else {
                of2.setInError(400, "Change Stream requires WebSocket, no 'Upgrade' or 'Connection' request header found");
                next(httpServerExchange);
            }
        } catch (QueryNotFoundException e) {
            of2.setInError(404, "Change Stream does not exist");
            LOGGER.debug("Requested Change Stream {} does not exist", of.getUnmappedRequestUri());
            next(httpServerExchange);
        } catch (IllegalStateException e2) {
            if (e2.getMessage() == null || !e2.getMessage().contains("transport does not support HTTP upgrade")) {
                return;
            }
            LOGGER.warn("Cannot open change stream: the AJP listener does not support WebSocket");
            of2.setInError(500, "Cannot open change stream: the AJP listener does not support WebSocket");
        } catch (QueryVariableNotBoundException e3) {
            of2.setInError(400, e3.getMessage());
            LOGGER.warn("Cannot open change stream, the request does not specify the required variables in the avars query paramter: {}", e3.getMessage());
            next(httpServerExchange);
        } catch (Throwable th) {
            LOGGER.error("Error handling the Change Stream request", th);
            of2.setInError(500, th.getMessage());
        }
    }

    private boolean isWebSocketHandshakeRequest(HttpServerExchange httpServerExchange) {
        HeaderValues headerValues = httpServerExchange.getRequestHeaders().get("connection");
        HeaderValues headerValues2 = httpServerExchange.getRequestHeaders().get("upgrade");
        return headerValues != null && headerValues2 != null && Arrays.stream(headerValues.toArray()).anyMatch(str -> {
            return str.toLowerCase().contains("upgrade");
        }) && Arrays.stream(headerValues2.toArray()).anyMatch(str2 -> {
            return str2.toLowerCase().contains("websocket");
        });
    }

    private List<BsonDocument> getResolvedStagesAsList(MongoRequest mongoRequest) throws InvalidMetadataException, QueryVariableNotBoundException, QueryNotFoundException {
        String changeStreamOperation = mongoRequest.getChangeStreamOperation();
        Optional<ChangeStreamOperation> findFirst = ChangeStreamOperation.getFromJson(mongoRequest.getCollectionProps()).stream().filter(changeStreamOperation2 -> {
            return changeStreamOperation2.getUri().equals(changeStreamOperation);
        }).findFirst();
        if (!findFirst.isPresent()) {
            throw new QueryNotFoundException("Stream " + mongoRequest.getUnmappedRequestUri() + "  does not exist");
        }
        ChangeStreamOperation changeStreamOperation3 = findFirst.get();
        return StagesInterpolator.interpolate(VarsInterpolator.VAR_OPERATOR.$var, StagesInterpolator.STAGE_OPERATOR.$ifvar, changeStreamOperation3.getStages(), (BsonDocument) mongoRequest.getExchange().getAttachment(AVARS_ATTACHMENT_KEY));
    }

    private synchronized void initChangeStreamWorker(HttpServerExchange httpServerExchange) throws QueryVariableNotBoundException, QueryNotFoundException, InvalidMetadataException {
        ChangeStreamWorkerKey changeStreamWorkerKey = new ChangeStreamWorkerKey(httpServerExchange);
        MongoRequest of = MongoRequest.of(httpServerExchange);
        List<BsonDocument> resolvedStagesAsList = getResolvedStagesAsList(of);
        if (!ChangeStreamWorkers.getInstance().get(changeStreamWorkerKey).isEmpty()) {
            LOGGER.debug("Change Stream Worker already exists, {}", changeStreamWorkerKey);
            return;
        }
        ChangeStreamWorker changeStreamWorker = new ChangeStreamWorker(changeStreamWorkerKey, resolvedStagesAsList, of.getDBName(), of.getCollectionName());
        ChangeStreamWorkers.getInstance().put(changeStreamWorker);
        ThreadsUtils.virtualThreadsExecutor().execute(changeStreamWorker);
        LOGGER.debug("Started Change Stream Worker, {}", changeStreamWorkerKey);
    }
}
