package org.apache.pulsar.io.mongodb;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.mongodb.client.model.changestream.ChangeStreamDocument;
import com.mongodb.client.model.changestream.FullDocument;
import com.mongodb.reactivestreams.client.ChangeStreamPublisher;
import com.mongodb.reactivestreams.client.MongoClient;
import com.mongodb.reactivestreams.client.MongoClients;
import com.mongodb.reactivestreams.client.MongoCollection;
import com.mongodb.reactivestreams.client.MongoDatabase;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.function.Supplier;
import lombok.Generated;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.io.core.PushSource;
import org.apache.pulsar.io.core.SourceContext;
import org.apache.pulsar.io.core.annotations.Connector;
import org.apache.pulsar.io.core.annotations.IOType;
import org.bson.BsonDocument;
import org.bson.BsonTimestamp;
import org.bson.Document;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Connector(name = "mongo", type = IOType.SOURCE, help = "A source connector that sends mongodb documents to pulsar", configClass = MongoSourceConfig.class)
/* loaded from: input_file:org/apache/pulsar/io/mongodb/MongoSource.class */
public class MongoSource extends PushSource<byte[]> {

    @Generated
    private static final Logger log = LoggerFactory.getLogger((Class<?>) MongoSource.class);
    private final Supplier<MongoClient> clientProvider;
    private MongoSourceConfig mongoSourceConfig;
    private MongoClient mongoClient;
    private ChangeStreamPublisher<Document> stream;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/pulsar/io/mongodb/MongoSource$DocRecord.class */
    public static class DocRecord implements Record<byte[]> {
        private final Optional<String> key;
        private final byte[] value;

        @Generated
        public DocRecord(Optional<String> optional, byte[] bArr) {
            this.key = optional;
            this.value = bArr;
        }

        @Override // org.apache.pulsar.functions.api.Record
        @Generated
        public Optional<String> getKey() {
            return this.key;
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // org.apache.pulsar.functions.api.Record
        @Generated
        public byte[] getValue() {
            return this.value;
        }

        @Generated
        public boolean equals(Object obj) {
            if (obj == this) {
                return true;
            }
            if (!(obj instanceof DocRecord)) {
                return false;
            }
            DocRecord docRecord = (DocRecord) obj;
            if (!docRecord.canEqual(this)) {
                return false;
            }
            Optional<String> key = getKey();
            Optional<String> key2 = docRecord.getKey();
            if (key == null) {
                if (key2 != null) {
                    return false;
                }
            } else if (!key.equals(key2)) {
                return false;
            }
            return Arrays.equals(getValue(), docRecord.getValue());
        }

        @Generated
        protected boolean canEqual(Object obj) {
            return obj instanceof DocRecord;
        }

        @Generated
        public int hashCode() {
            Optional<String> key = getKey();
            return (((1 * 59) + (key == null ? 43 : key.hashCode())) * 59) + Arrays.hashCode(getValue());
        }

        @Generated
        public String toString() {
            return "MongoSource.DocRecord(key=" + getKey() + ", value=" + Arrays.toString(getValue()) + ")";
        }
    }

    public MongoSource() {
        this(null);
    }

    public MongoSource(Supplier<MongoClient> supplier) {
        this.clientProvider = supplier;
    }

    @Override // org.apache.pulsar.io.core.PushSource, org.apache.pulsar.io.core.Source
    public void open(Map<String, Object> map, SourceContext sourceContext) throws Exception {
        log.info("Open MongoDB Source");
        this.mongoSourceConfig = MongoSourceConfig.load(map, sourceContext);
        this.mongoSourceConfig.validate();
        if (this.clientProvider != null) {
            this.mongoClient = this.clientProvider.get();
        } else {
            this.mongoClient = MongoClients.create(this.mongoSourceConfig.getMongoUri());
        }
        String database = this.mongoSourceConfig.getDatabase();
        if (StringUtils.isEmpty(database)) {
            log.info("Watch all databases");
            this.stream = this.mongoClient.watch();
        } else {
            MongoDatabase database2 = this.mongoClient.getDatabase(database);
            String collection = this.mongoSourceConfig.getCollection();
            if (StringUtils.isEmpty(collection)) {
                log.info("Watch db: {}", database2.getName());
                this.stream = database2.watch();
            } else {
                MongoCollection<Document> collection2 = database2.getCollection(collection);
                log.info("Watch collection: {}.{}", database2.getName(), collection);
                this.stream = collection2.watch();
            }
        }
        this.stream.batchSize(this.mongoSourceConfig.getBatchSize()).fullDocument(FullDocument.UPDATE_LOOKUP);
        if (this.mongoSourceConfig.getSyncType() == SyncType.FULL_SYNC) {
            this.stream.startAtOperationTime(new BsonTimestamp(0L));
        }
        this.stream.subscribe(new Subscriber<ChangeStreamDocument<Document>>() { // from class: org.apache.pulsar.io.mongodb.MongoSource.1
            private ObjectMapper mapper = new ObjectMapper();
            private Subscription subscription;

            @Override // org.reactivestreams.Subscriber
            public void onSubscribe(Subscription subscription) {
                this.subscription = subscription;
                this.subscription.request(2147483647L);
            }

            @Override // org.reactivestreams.Subscriber
            public void onNext(ChangeStreamDocument<Document> changeStreamDocument) {
                try {
                    MongoSource.log.info("New change doc: {}", changeStreamDocument);
                    BsonDocument documentKey = changeStreamDocument.getDocumentKey();
                    if (documentKey == null) {
                        MongoSource.log.warn("The document key is null");
                        return;
                    }
                    HashMap hashMap = new HashMap();
                    hashMap.put("fullDocument", changeStreamDocument.getFullDocument());
                    hashMap.put("ns", changeStreamDocument.getNamespace());
                    hashMap.put("operation", changeStreamDocument.getOperationType());
                    MongoSource.this.consume(new DocRecord(Optional.of(documentKey.toJson()), this.mapper.writeValueAsString(hashMap).getBytes(StandardCharsets.UTF_8)));
                } catch (JsonProcessingException e) {
                    MongoSource.log.error("Processing doc from mongo", (Throwable) e);
                }
            }

            @Override // org.reactivestreams.Subscriber
            public void onError(Throwable th) {
                MongoSource.log.error("Subscriber error", th);
            }

            @Override // org.reactivestreams.Subscriber
            public void onComplete() {
                MongoSource.log.info("Subscriber complete");
            }
        });
    }

    @Override // java.lang.AutoCloseable
    public void close() throws Exception {
        if (this.mongoClient != null) {
            this.mongoClient.close();
        }
    }
}
