package com.mongodb.kafka.connect.source;

import com.mongodb.MongoClientSettings;
import com.mongodb.client.ChangeStreamIterable;
import com.mongodb.client.MongoChangeStreamCursor;
import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoClients;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoCursor;
import com.mongodb.client.MongoDatabase;
import com.mongodb.client.model.Collation;
import com.mongodb.client.model.changestream.ChangeStreamDocument;
import com.mongodb.client.model.changestream.FullDocument;
import com.mongodb.kafka.connect.Versions;
import com.mongodb.kafka.connect.sink.MongoSinkTopicConfig;
import com.mongodb.kafka.connect.util.ConfigHelper;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;
import org.bson.BsonDocument;
import org.bson.BsonDocumentWrapper;
import org.bson.BsonString;
import org.bson.Document;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/mongodb/kafka/connect/source/MongoSourceTask.class */
public class MongoSourceTask extends SourceTask {
    private static final Logger LOGGER = LoggerFactory.getLogger(MongoSourceTask.class);
    private static final String INVALIDATE = "invalidate";
    private final Time time;
    private final AtomicBoolean isRunning;
    private MongoSourceConfig sourceConfig;
    private MongoClient mongoClient;
    private final AtomicBoolean isCopying;
    private MongoCopyDataManager copyDataManager;
    private BsonDocument cachedResult;
    private BsonDocument cachedResumeAfter;
    private MongoCursor<BsonDocument> cursor;

    public MongoSourceTask() {
        this(new SystemTime());
    }

    private MongoSourceTask(Time time) {
        this.isRunning = new AtomicBoolean();
        this.isCopying = new AtomicBoolean();
        this.time = time;
    }

    public String version() {
        return Versions.VERSION;
    }

    public void start(Map<String, String> map) {
        LOGGER.debug("Starting MongoDB source task");
        try {
            this.sourceConfig = new MongoSourceConfig(map);
            this.mongoClient = MongoClients.create(this.sourceConfig.getConnectionString(), ConfigHelper.getMongoDriverInformation());
            if (shouldCopyData()) {
                setCachedResultAndResumeToken();
                this.copyDataManager = new MongoCopyDataManager(this.sourceConfig, this.mongoClient);
                this.isCopying.set(true);
            } else {
                this.cursor = createCursor(this.sourceConfig, this.mongoClient);
            }
            this.isRunning.set(true);
            LOGGER.debug("Started MongoDB source task");
        } catch (Exception e) {
            throw new ConnectException("Failed to start new task", e);
        }
    }

    public List<SourceRecord> poll() {
        long milliseconds = this.time.milliseconds();
        LOGGER.debug("Polling Start: {}", Long.valueOf(this.time.milliseconds()));
        ArrayList arrayList = new ArrayList();
        boolean booleanValue = this.sourceConfig.getBoolean(MongoSourceConfig.PUBLISH_FULL_DOCUMENT_ONLY_CONFIG).booleanValue();
        int intValue = this.sourceConfig.getInt(MongoSourceConfig.POLL_MAX_BATCH_SIZE_CONFIG).intValue();
        long longValue = milliseconds + this.sourceConfig.getLong(MongoSourceConfig.POLL_AWAIT_TIME_MS_CONFIG).longValue();
        String string = this.sourceConfig.getString(MongoSourceConfig.TOPIC_PREFIX_CONFIG);
        Map<String, Object> createPartitionMap = createPartitionMap(this.sourceConfig);
        while (this.isRunning.get()) {
            Optional<BsonDocument> nextDocument = getNextDocument();
            long milliseconds2 = longValue - this.time.milliseconds();
            if (nextDocument.isPresent()) {
                BsonDocument bsonDocument = nextDocument.get();
                HashMap hashMap = new HashMap();
                hashMap.put(MongoSinkTopicConfig.ID_FIELD, bsonDocument.getDocument(MongoSinkTopicConfig.ID_FIELD).toJson());
                if (this.isCopying.get()) {
                    hashMap.put("copy", "true");
                }
                String topicNameFromNamespace = getTopicNameFromNamespace(string, bsonDocument.getDocument("ns", new BsonDocument()));
                Optional empty = Optional.empty();
                if (!booleanValue) {
                    empty = Optional.of(bsonDocument.toJson());
                } else if (bsonDocument.containsKey("fullDocument")) {
                    empty = Optional.of(bsonDocument.getDocument("fullDocument").toJson());
                }
                empty.ifPresent(str -> {
                    LOGGER.trace("Adding {} to {}", str, topicNameFromNamespace);
                    arrayList.add(new SourceRecord(createPartitionMap, hashMap, topicNameFromNamespace, Schema.STRING_SCHEMA, new BsonDocument(MongoSinkTopicConfig.ID_FIELD, bsonDocument.get(MongoSinkTopicConfig.ID_FIELD)).toJson(), Schema.STRING_SCHEMA, str));
                });
                if (bsonDocument.getString("operationType", new BsonString("")).getValue().equalsIgnoreCase(INVALIDATE)) {
                    LOGGER.info("Cursor has been invalidated.");
                    this.cursor = null;
                    return arrayList;
                }
                if (arrayList.size() == intValue) {
                    LOGGER.debug("Reached max batch size: {}, returning records", Integer.valueOf(intValue));
                    return arrayList;
                }
            } else {
                if (milliseconds2 <= 0) {
                    LOGGER.debug("Poll await time passed before reaching max batch size returning {} records", Integer.valueOf(arrayList.size()));
                    if (arrayList.isEmpty()) {
                        return null;
                    }
                    return arrayList;
                }
                LOGGER.debug("Waiting {} ms to poll", Long.valueOf(milliseconds2));
                this.time.sleep(milliseconds2);
            }
        }
        return null;
    }

    public synchronized void stop() {
        LOGGER.debug("Stopping MongoDB source task");
        this.isRunning.set(false);
        this.isCopying.set(false);
        if (this.copyDataManager != null) {
            this.copyDataManager.close();
            this.copyDataManager = null;
        }
        if (this.cursor != null) {
            this.cursor.close();
            this.cursor = null;
        }
        if (this.mongoClient != null) {
            this.mongoClient.close();
            this.mongoClient = null;
        }
    }

    MongoCursor<BsonDocument> createCursor(MongoSourceConfig mongoSourceConfig, MongoClient mongoClient) {
        LOGGER.debug("Creating a MongoCursor");
        ChangeStreamIterable<Document> changeStreamIterable = getChangeStreamIterable(mongoSourceConfig, mongoClient);
        BsonDocument resumeAfter = getResumeAfter(mongoSourceConfig);
        if (resumeAfter != null) {
            LOGGER.info("Resuming the change stream at the previous offset");
            changeStreamIterable.resumeAfter(resumeAfter);
            this.cachedResumeAfter = null;
        }
        LOGGER.debug("Cursor created");
        return changeStreamIterable.withDocumentClass(BsonDocument.class).iterator();
    }

    String getTopicNameFromNamespace(String str, BsonDocument bsonDocument) {
        String str2 = "";
        if (bsonDocument.containsKey("db")) {
            str2 = bsonDocument.getString("db").getValue();
            if (bsonDocument.containsKey("coll")) {
                str2 = String.format("%s.%s", str2, bsonDocument.getString("coll").getValue());
            }
        }
        return str.isEmpty() ? str2 : String.format("%s.%s", str, str2);
    }

    Map<String, Object> createPartitionMap(MongoSourceConfig mongoSourceConfig) {
        return Collections.singletonMap("ns", String.format("%s/%s.%s", mongoSourceConfig.getString("connection.uri"), mongoSourceConfig.getString("database"), mongoSourceConfig.getString("collection")));
    }

    private boolean shouldCopyData() {
        Map<String, Object> offset = getOffset(this.sourceConfig);
        return this.sourceConfig.getBoolean(MongoSourceConfig.COPY_EXISTING_CONFIG).booleanValue() && (offset == null || offset.containsKey("copy"));
    }

    private void setCachedResultAndResumeToken() {
        MongoChangeStreamCursor cursor = getChangeStreamIterable(this.sourceConfig, this.mongoClient).cursor();
        ChangeStreamDocument changeStreamDocument = (ChangeStreamDocument) cursor.tryNext();
        if (changeStreamDocument != null) {
            this.cachedResult = new BsonDocumentWrapper(changeStreamDocument, ChangeStreamDocument.createCodec(Document.class, MongoClientSettings.getDefaultCodecRegistry()));
        }
        this.cachedResumeAfter = changeStreamDocument != null ? changeStreamDocument.getResumeToken() : cursor.getResumeToken();
        cursor.close();
    }

    private Optional<BsonDocument> getNextDocument() {
        if (this.isCopying.get()) {
            Optional<BsonDocument> poll = this.copyDataManager.poll();
            if (poll.isPresent() || this.copyDataManager.isCopying()) {
                return poll;
            }
            this.isCopying.set(false);
            if (this.cachedResult != null) {
                Optional<BsonDocument> of = Optional.of(this.cachedResult);
                this.cachedResult = null;
                return of;
            }
        }
        if (this.cursor == null) {
            this.cursor = createCursor(this.sourceConfig, this.mongoClient);
        }
        try {
            return Optional.ofNullable((BsonDocument) this.cursor.tryNext());
        } catch (Exception e) {
            this.cursor.close();
            this.cursor = null;
            return Optional.empty();
        }
    }

    private ChangeStreamIterable<Document> getChangeStreamIterable(MongoSourceConfig mongoSourceConfig, MongoClient mongoClient) {
        ChangeStreamIterable<Document> changeStreamIterable;
        String string = mongoSourceConfig.getString("database");
        String string2 = mongoSourceConfig.getString("collection");
        Optional<List<Document>> pipeline = mongoSourceConfig.getPipeline();
        if (string.isEmpty()) {
            LOGGER.info("Watching all changes on the cluster");
            Objects.requireNonNull(mongoClient);
            changeStreamIterable = (ChangeStreamIterable) pipeline.map(mongoClient::watch).orElse(mongoClient.watch());
        } else if (string2.isEmpty()) {
            LOGGER.info("Watching for database changes on '{}'", string);
            MongoDatabase database = mongoClient.getDatabase(string);
            Objects.requireNonNull(database);
            changeStreamIterable = (ChangeStreamIterable) pipeline.map(database::watch).orElse(database.watch());
        } else {
            LOGGER.info("Watching for collection changes on '{}.{}'", string, string2);
            MongoCollection collection = mongoClient.getDatabase(string).getCollection(string2);
            Objects.requireNonNull(collection);
            changeStreamIterable = (ChangeStreamIterable) pipeline.map(collection::watch).orElse(collection.watch());
        }
        int intValue = mongoSourceConfig.getInt(MongoSourceConfig.BATCH_SIZE_CONFIG).intValue();
        if (intValue > 0) {
            changeStreamIterable.batchSize(intValue);
        }
        Optional<FullDocument> fullDocument = mongoSourceConfig.getFullDocument();
        ChangeStreamIterable<Document> changeStreamIterable2 = changeStreamIterable;
        Objects.requireNonNull(changeStreamIterable2);
        fullDocument.ifPresent(changeStreamIterable2::fullDocument);
        Optional<Collation> collation = mongoSourceConfig.getCollation();
        ChangeStreamIterable<Document> changeStreamIterable3 = changeStreamIterable;
        Objects.requireNonNull(changeStreamIterable3);
        collation.ifPresent(changeStreamIterable3::collation);
        return changeStreamIterable;
    }

    private Map<String, Object> getOffset(MongoSourceConfig mongoSourceConfig) {
        if (this.context != null) {
            return this.context.offsetStorageReader().offset(createPartitionMap(mongoSourceConfig));
        }
        return null;
    }

    private BsonDocument getResumeAfter(MongoSourceConfig mongoSourceConfig) {
        return getResumeAfter(getOffset(mongoSourceConfig));
    }

    private BsonDocument getResumeAfter(Map<String, Object> map) {
        if (this.cachedResumeAfter != null) {
            return this.cachedResumeAfter;
        }
        if (map == null || map.containsKey("initialSync")) {
            return null;
        }
        return BsonDocument.parse((String) map.get(MongoSinkTopicConfig.ID_FIELD));
    }
}
