package org.restheart.mongodb.handlers.changestreams;

import com.mongodb.client.model.changestream.ChangeStreamDocument;
import java.util.List;
import org.bson.BsonArray;
import org.bson.BsonDocument;
import org.bson.BsonNull;
import org.bson.BsonString;
import org.bson.BsonValue;
import org.bson.Document;
import org.bson.codecs.Codec;
import org.bson.codecs.DocumentCodec;
import org.bson.codecs.configuration.CodecRegistries;
import org.bson.codecs.configuration.CodecRegistry;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.restheart.utils.JsonUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/restheart/mongodb/handlers/changestreams/ChangeStreamSubscriber.class */
public class ChangeStreamSubscriber implements Subscriber<ChangeStreamDocument> {
    private final SessionKey sessionKey;
    private Subscription sub;
    private static final Logger LOGGER = LoggerFactory.getLogger(ChangeStreamSubscriber.class);
    private static final CodecRegistry REGISTRY = CodecRegistries.fromCodecs(new Codec[]{new DocumentCodec()});

    public ChangeStreamSubscriber(SessionKey sessionKey) {
        this.sessionKey = sessionKey;
    }

    public void onSubscribe(Subscription subscription) {
        subscription.request(Long.MAX_VALUE);
        this.sub = subscription;
    }

    public void onNext(ChangeStreamDocument changeStreamDocument) {
        if (GuavaHashMultimapSingleton.get(this.sessionKey).isEmpty()) {
            stop();
            LOGGER.debug("Closing unwatched stream, sessionKey=" + this.sessionKey);
            GetChangeStreamHandler.OPENED_STREAMS.remove(this.sessionKey);
        } else {
            LOGGER.trace("[clients watching]: " + GuavaHashMultimapSingleton.get(this.sessionKey).size());
            LOGGER.debug("Change stream notification for sessionKey={}: {}", this.sessionKey, changeStreamDocument);
            ChangeStreamWebsocketCallback.NOTIFICATION_PUBLISHER.submit(new ChangeStreamNotification(this.sessionKey, JsonUtils.toJson(getDocument(changeStreamDocument), this.sessionKey.getJsonMode())));
        }
    }

    public void onError(Throwable th) {
        LOGGER.warn("Error from stream: " + th.getMessage());
    }

    public void onComplete() {
        LOGGER.debug("Stream completed, sessionKey=" + this.sessionKey);
    }

    public void stop() {
        this.sub.cancel();
    }

    private BsonDocument getDocument(ChangeStreamDocument changeStreamDocument) {
        BsonDocument bsonDocument = new BsonDocument();
        if (changeStreamDocument == null) {
            return bsonDocument;
        }
        bsonDocument.put("fullDocument", toBson((Document) changeStreamDocument.getFullDocument()));
        bsonDocument.put("documentKey", changeStreamDocument.getDocumentKey());
        if (changeStreamDocument.getUpdateDescription() != null) {
            BsonDocument bsonDocument2 = new BsonDocument();
            BsonDocument updatedFields = changeStreamDocument.getUpdateDescription().getUpdatedFields();
            if (updatedFields != null) {
                bsonDocument2.put("updatedFields", updatedFields);
            } else {
                bsonDocument2.put("updatedFields", BsonNull.VALUE);
            }
            List removedFields = changeStreamDocument.getUpdateDescription().getRemovedFields();
            if (removedFields == null) {
                bsonDocument2.put("updatedFields", new BsonArray());
            } else {
                BsonArray bsonArray = new BsonArray();
                removedFields.forEach(str -> {
                    bsonArray.add(new BsonString(str));
                });
                bsonDocument2.put("removedFields", bsonArray);
            }
            bsonDocument.put("updateDescription", bsonDocument2);
        } else {
            bsonDocument.put("updateDescription", BsonNull.VALUE);
        }
        bsonDocument.put("operationType", new BsonString(changeStreamDocument.getOperationType().getValue()));
        return bsonDocument;
    }

    private static BsonValue toBson(Document document) {
        return document == null ? BsonNull.VALUE : document.toBsonDocument(BsonDocument.class, REGISTRY);
    }
}
