package com.mongodb.kafka.connect.source;

import com.mongodb.ConnectionString;
import com.mongodb.MongoClientSettings;
import com.mongodb.MongoCommandException;
import com.mongodb.client.ChangeStreamIterable;
import com.mongodb.client.MongoChangeStreamCursor;
import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoClients;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoDatabase;
import com.mongodb.client.model.Collation;
import com.mongodb.client.model.changestream.ChangeStreamDocument;
import com.mongodb.client.model.changestream.FullDocument;
import com.mongodb.kafka.connect.Versions;
import com.mongodb.kafka.connect.source.MongoSourceConfig;
import com.mongodb.kafka.connect.source.heartbeat.HeartbeatManager;
import com.mongodb.kafka.connect.source.producer.SchemaAndValueProducer;
import com.mongodb.kafka.connect.source.producer.SchemaAndValueProducers;
import com.mongodb.kafka.connect.source.topic.mapping.TopicMapper;
import com.mongodb.kafka.connect.util.ConfigHelper;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;
import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.errors.DataException;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;
import org.bson.BsonDocument;
import org.bson.BsonDocumentWrapper;
import org.bson.Document;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/mongodb/kafka/connect/source/MongoSourceTask.class */
public final class MongoSourceTask extends SourceTask {
    private static final Logger LOGGER = LoggerFactory.getLogger(MongoSourceTask.class);
    private static final String CONNECTOR_TYPE = "source";
    public static final String ID_FIELD = "_id";
    private static final String COPY_KEY = "copy";
    private static final String NS_KEY = "ns";
    private static final String FULL_DOCUMENT = "fullDocument";
    private static final int NAMESPACE_NOT_FOUND_ERROR = 26;
    private static final int INVALIDATED_RESUME_TOKEN_ERROR = 260;
    private static final int UNKNOWN_FIELD_ERROR = 40415;
    private static final int FAILED_TO_PARSE_ERROR = 9;
    private static final String RESUME_TOKEN = "resume token";
    private static final String NOT_FOUND = "not found";
    private static final String DOES_NOT_EXIST = "does not exist";
    private static final String INVALID_RESUME_TOKEN = "invalid resume token";
    private final Time time;
    private final AtomicBoolean isRunning;
    private final AtomicBoolean isCopying;
    private MongoSourceConfig sourceConfig;
    private Map<String, Object> partitionMap;
    private MongoClient mongoClient;
    private HeartbeatManager heartbeatManager;
    private boolean supportsStartAfter;
    private boolean invalidatedCursor;
    private MongoCopyDataManager copyDataManager;
    private BsonDocument cachedResult;
    private BsonDocument cachedResumeToken;
    private MongoChangeStreamCursor<? extends BsonDocument> cursor;

    public MongoSourceTask() {
        this(new SystemTime());
    }

    private MongoSourceTask(Time time) {
        this.isRunning = new AtomicBoolean();
        this.isCopying = new AtomicBoolean();
        this.supportsStartAfter = true;
        this.invalidatedCursor = false;
        this.time = time;
    }

    public String version() {
        return Versions.VERSION;
    }

    public void start(Map<String, String> map) {
        LOGGER.info("Starting MongoDB source task");
        try {
            this.sourceConfig = new MongoSourceConfig(map);
            this.heartbeatManager = null;
            this.partitionMap = null;
            createPartitionMap(this.sourceConfig);
            this.mongoClient = MongoClients.create(this.sourceConfig.getConnectionString(), ConfigHelper.getMongoDriverInformation(CONNECTOR_TYPE));
            if (shouldCopyData()) {
                setCachedResultAndResumeToken();
                this.copyDataManager = new MongoCopyDataManager(this.sourceConfig, this.mongoClient);
                this.isCopying.set(true);
            } else {
                initializeCursorAndHeartbeatManager(this.time, this.sourceConfig, this.mongoClient);
            }
            this.isRunning.set(true);
            LOGGER.info("Started MongoDB source task");
        } catch (Exception e) {
            throw new ConnectException("Failed to start new task", e);
        }
    }

    public List<SourceRecord> poll() {
        long milliseconds = this.time.milliseconds();
        LOGGER.debug("Polling Start: {}", Long.valueOf(milliseconds));
        ArrayList arrayList = new ArrayList();
        TopicMapper topicMapper = this.sourceConfig.getTopicMapper();
        boolean booleanValue = this.sourceConfig.getBoolean(MongoSourceConfig.PUBLISH_FULL_DOCUMENT_ONLY_CONFIG).booleanValue();
        int intValue = this.sourceConfig.getInt(MongoSourceConfig.POLL_MAX_BATCH_SIZE_CONFIG).intValue();
        long longValue = milliseconds + this.sourceConfig.getLong(MongoSourceConfig.POLL_AWAIT_TIME_MS_CONFIG).longValue();
        Map<String, Object> createPartitionMap = createPartitionMap(this.sourceConfig);
        SchemaAndValueProducer createKeySchemaAndValueProvider = SchemaAndValueProducers.createKeySchemaAndValueProvider(this.sourceConfig);
        SchemaAndValueProducer createValueSchemaAndValueProvider = SchemaAndValueProducers.createValueSchemaAndValueProvider(this.sourceConfig);
        while (this.isRunning.get()) {
            Optional<BsonDocument> nextDocument = getNextDocument();
            long milliseconds2 = longValue - this.time.milliseconds();
            if (nextDocument.isPresent()) {
                BsonDocument bsonDocument = nextDocument.get();
                HashMap hashMap = new HashMap();
                hashMap.put("_id", bsonDocument.getDocument("_id").toJson());
                if (this.isCopying.get()) {
                    hashMap.put(COPY_KEY, "true");
                }
                String topic = topicMapper.getTopic(bsonDocument);
                if (topic.isEmpty()) {
                    LOGGER.warn("No topic set. Could not publish the message: {}", bsonDocument.toJson());
                    return arrayList;
                }
                Optional empty = Optional.empty();
                if (!booleanValue) {
                    empty = Optional.of(bsonDocument);
                } else if (bsonDocument.containsKey(FULL_DOCUMENT) && bsonDocument.get((Object) FULL_DOCUMENT).isDocument()) {
                    empty = Optional.of(bsonDocument.getDocument(FULL_DOCUMENT));
                }
                empty.ifPresent(bsonDocument2 -> {
                    LOGGER.trace("Adding {} to {}: {}", new Object[]{bsonDocument2, topic, hashMap});
                    Optional<SourceRecord> createSourceRecord = createSourceRecord(createPartitionMap, createKeySchemaAndValueProvider, createValueSchemaAndValueProvider, hashMap, topic, this.sourceConfig.getKeyOutputFormat() == MongoSourceConfig.OutputFormat.SCHEMA ? bsonDocument : new BsonDocument("_id", bsonDocument.get("_id")), bsonDocument2);
                    Objects.requireNonNull(arrayList);
                    createSourceRecord.map((v1) -> {
                        return r1.add(v1);
                    });
                });
                if (arrayList.size() == intValue) {
                    LOGGER.debug("Reached '{}': {}, returning records", MongoSourceConfig.POLL_MAX_BATCH_SIZE_CONFIG, Integer.valueOf(intValue));
                    return arrayList;
                }
            } else {
                if (milliseconds2 <= 0) {
                    if (!arrayList.isEmpty()) {
                        return arrayList;
                    }
                    if (this.heartbeatManager != null) {
                        return (List) this.heartbeatManager.heartbeat().map((v0) -> {
                            return Collections.singletonList(v0);
                        }).orElse(null);
                    }
                    return null;
                }
                LOGGER.debug("Waiting {} ms to poll", Long.valueOf(milliseconds2));
                this.time.sleep(milliseconds2);
            }
        }
        return null;
    }

    private Optional<SourceRecord> createSourceRecord(Map<String, Object> map, SchemaAndValueProducer schemaAndValueProducer, SchemaAndValueProducer schemaAndValueProducer2, Map<String, String> map2, String str, BsonDocument bsonDocument, BsonDocument bsonDocument2) {
        try {
            SchemaAndValue schemaAndValue = schemaAndValueProducer.get(bsonDocument);
            SchemaAndValue schemaAndValue2 = schemaAndValueProducer2.get(bsonDocument2);
            return Optional.of(new SourceRecord(map, map2, str, schemaAndValue.schema(), schemaAndValue.value(), schemaAndValue2.schema(), schemaAndValue2.value()));
        } catch (Exception e) {
            Supplier supplier = () -> {
                return String.format("Exception creating Source record for: Key=%s Value=%s", bsonDocument.toJson(), bsonDocument2.toJson());
            };
            if (this.sourceConfig.logErrors()) {
                LOGGER.error((String) supplier.get(), e);
            }
            if (this.sourceConfig.tolerateErrors()) {
                return this.sourceConfig.getString(MongoSourceConfig.ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG).isEmpty() ? Optional.empty() : Optional.of(new SourceRecord(map, map2, this.sourceConfig.getString(MongoSourceConfig.ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG), Schema.STRING_SCHEMA, bsonDocument.toJson(), Schema.STRING_SCHEMA, bsonDocument2.toJson()));
            }
            throw new DataException((String) supplier.get(), e);
        }
    }

    public synchronized void stop() {
        LOGGER.info("Stopping MongoDB source task");
        this.isRunning.set(false);
        this.isCopying.set(false);
        if (this.copyDataManager != null) {
            this.copyDataManager.close();
            this.copyDataManager = null;
        }
        if (this.cursor != null) {
            this.cursor.close();
            this.cursor = null;
        }
        if (this.mongoClient != null) {
            this.mongoClient.close();
            this.mongoClient = null;
        }
        this.supportsStartAfter = true;
        this.invalidatedCursor = false;
    }

    void initializeCursorAndHeartbeatManager(Time time, MongoSourceConfig mongoSourceConfig, MongoClient mongoClient) {
        this.cursor = createCursor(mongoSourceConfig, mongoClient);
        this.heartbeatManager = new HeartbeatManager(time, this.cursor, mongoSourceConfig.getLong(MongoSourceConfig.HEARTBEAT_INTERVAL_MS_CONFIG).longValue(), mongoSourceConfig.getString(MongoSourceConfig.HEARTBEAT_TOPIC_NAME_CONFIG), this.partitionMap);
    }

    MongoChangeStreamCursor<? extends BsonDocument> createCursor(MongoSourceConfig mongoSourceConfig, MongoClient mongoClient) {
        LOGGER.debug("Creating a MongoCursor");
        return tryCreateCursor(mongoSourceConfig, mongoClient, getResumeToken(mongoSourceConfig));
    }

    private MongoChangeStreamCursor<? extends BsonDocument> tryCreateCursor(MongoSourceConfig mongoSourceConfig, MongoClient mongoClient, BsonDocument bsonDocument) {
        try {
            ChangeStreamIterable<Document> changeStreamIterable = getChangeStreamIterable(mongoSourceConfig, mongoClient);
            if (bsonDocument != null && this.supportsStartAfter) {
                LOGGER.info("Resuming the change stream after the previous offset: {}", bsonDocument);
                changeStreamIterable.startAfter(bsonDocument);
            } else if (bsonDocument == null || this.invalidatedCursor) {
                LOGGER.info("New change stream cursor created without offset.");
            } else {
                LOGGER.info("Resuming the change stream after the previous offset using resumeAfter: {}", bsonDocument);
                changeStreamIterable.resumeAfter(bsonDocument);
            }
            return (MongoChangeStreamCursor) changeStreamIterable.withDocumentClass(BsonDocument.class).cursor();
        } catch (MongoCommandException e) {
            if (bsonDocument != null) {
                if (invalidatedResumeToken(e)) {
                    this.invalidatedCursor = true;
                    return tryCreateCursor(mongoSourceConfig, mongoClient, null);
                }
                if (doesNotSupportsStartAfter(e)) {
                    this.supportsStartAfter = false;
                    return tryCreateCursor(mongoSourceConfig, mongoClient, bsonDocument);
                }
                if (mongoSourceConfig.tolerateErrors() && resumeTokenNotFound(e)) {
                    LOGGER.warn("Failed to resume change stream: {} {}\n===================================================================================\nWhen the resume token is no longer available there is the potential for data loss.\n\nRestarting the change stream with no resume token because `errors.tolerance=all`.\n===================================================================================\n", e.getErrorMessage(), Integer.valueOf(e.getErrorCode()));
                    this.invalidatedCursor = true;
                    return tryCreateCursor(mongoSourceConfig, mongoClient, null);
                }
            }
            if (e.getErrorCode() == NAMESPACE_NOT_FOUND_ERROR) {
                LOGGER.info("Namespace not found cursor closed.");
                return null;
            }
            LOGGER.warn("Failed to resume change stream: {} {}\n\n=====================================================================================\nIf the resume token is no longer available then there is the potential for data loss.\nSaved resume tokens are managed by Kafka and stored with the offset data.\n\nTo restart the change stream with no resume token either: \n  * Create a new partition name using the `offset.partition.name` configuration.\n  * Set `errors.tolerance=all` and ignore the erroring resume token. \n  * Manually remove the old offset from its configured storage.\n\nResetting the offset will allow for the connector to be resume from the latest resume\ntoken. Using `copy.existing=true` ensures that all data will be outputted by the\nconnector but it will duplicate existing data.\n=====================================================================================\n", e.getErrorMessage(), Integer.valueOf(e.getErrorCode()));
            if (resumeTokenNotFound(e)) {
                throw new ConnectException("ResumeToken not found. Cannot create a change stream cursor", e);
            }
            return null;
        }
    }

    private boolean doesNotSupportsStartAfter(MongoCommandException mongoCommandException) {
        return (mongoCommandException.getErrorCode() == 9 || mongoCommandException.getErrorCode() == UNKNOWN_FIELD_ERROR) && mongoCommandException.getErrorMessage().contains("startAfter");
    }

    private boolean invalidatedResumeToken(MongoCommandException mongoCommandException) {
        return mongoCommandException.getErrorCode() == INVALIDATED_RESUME_TOKEN_ERROR;
    }

    private boolean resumeTokenNotFound(MongoCommandException mongoCommandException) {
        String lowerCase = mongoCommandException.getErrorMessage().toLowerCase(Locale.ROOT);
        return lowerCase.contains(RESUME_TOKEN) && (lowerCase.contains(NOT_FOUND) || lowerCase.contains(DOES_NOT_EXIST) || lowerCase.contains(INVALID_RESUME_TOKEN));
    }

    Map<String, Object> createPartitionMap(MongoSourceConfig mongoSourceConfig) {
        if (this.partitionMap == null) {
            String string = mongoSourceConfig.getString(MongoSourceConfig.OFFSET_PARTITION_NAME_CONFIG);
            if (string.isEmpty()) {
                string = createDefaultPartitionName(mongoSourceConfig);
            }
            this.partitionMap = Collections.singletonMap(NS_KEY, string);
        }
        return this.partitionMap;
    }

    Map<String, Object> createLegacyPartitionMap(MongoSourceConfig mongoSourceConfig) {
        return Collections.singletonMap(NS_KEY, createLegacyPartitionName(mongoSourceConfig));
    }

    String createLegacyPartitionName(MongoSourceConfig mongoSourceConfig) {
        return String.format("%s/%s.%s", mongoSourceConfig.getString("connection.uri"), mongoSourceConfig.getString("database"), mongoSourceConfig.getString("collection"));
    }

    String createDefaultPartitionName(MongoSourceConfig mongoSourceConfig) {
        ConnectionString connectionString = mongoSourceConfig.getConnectionString();
        StringBuilder sb = new StringBuilder();
        sb.append(connectionString.isSrvProtocol() ? "mongodb+srv://" : "mongodb://");
        sb.append(String.join(",", connectionString.getHosts()));
        sb.append("/");
        sb.append(mongoSourceConfig.getString("database"));
        if (!mongoSourceConfig.getString("collection").isEmpty()) {
            sb.append(".");
            sb.append(mongoSourceConfig.getString("collection"));
        }
        return sb.toString();
    }

    private boolean shouldCopyData() {
        Map<String, Object> offset = getOffset(this.sourceConfig);
        return this.sourceConfig.getBoolean(MongoSourceConfig.COPY_EXISTING_CONFIG).booleanValue() && (offset == null || offset.containsKey(COPY_KEY));
    }

    private void setCachedResultAndResumeToken() {
        try {
            MongoChangeStreamCursor<ChangeStreamDocument<Document>> cursor = getChangeStreamIterable(this.sourceConfig, this.mongoClient).cursor();
            ChangeStreamDocument<Document> tryNext = cursor.tryNext();
            if (tryNext != null) {
                this.cachedResult = new BsonDocumentWrapper(tryNext, ChangeStreamDocument.createCodec(Document.class, MongoClientSettings.getDefaultCodecRegistry()));
            }
            this.cachedResumeToken = tryNext != null ? tryNext.getResumeToken() : cursor.getResumeToken();
            cursor.close();
        } catch (MongoCommandException e) {
            if (e.getErrorCode() != NAMESPACE_NOT_FOUND_ERROR) {
                throw new ConnectException(e);
            }
        }
    }

    private Optional<BsonDocument> getNextDocument() {
        if (this.isCopying.get()) {
            Optional<BsonDocument> poll = this.copyDataManager.poll();
            if (poll.isPresent() || this.copyDataManager.isCopying()) {
                return poll;
            }
            LOGGER.info("Shutting down executors");
            this.isCopying.set(false);
            if (this.cachedResult != null) {
                Optional<BsonDocument> of = Optional.of(this.cachedResult);
                this.cachedResult = null;
                return of;
            }
            LOGGER.info("Finished copying existing data from the collection(s).");
        }
        if (this.cursor == null) {
            initializeCursorAndHeartbeatManager(this.time, this.sourceConfig, this.mongoClient);
        }
        if (this.cursor == null) {
            return Optional.empty();
        }
        try {
            BsonDocument tryNext = this.cursor.tryNext();
            if (tryNext == null && this.cursor.getServerCursor() == null) {
                invalidateCursorAndReinitialize();
                tryNext = this.cursor != null ? this.cursor.tryNext() : null;
            }
            return Optional.ofNullable(tryNext);
        } catch (Exception e) {
            if (this.cursor != null) {
                this.cursor.close();
                this.cursor = null;
            }
            if (this.isRunning.get()) {
                LOGGER.info("An exception occurred when trying to get the next item from the Change Stream: {}", e.getMessage());
            }
            return Optional.empty();
        }
    }

    private void invalidateCursorAndReinitialize() {
        this.invalidatedCursor = true;
        this.cursor.close();
        this.cursor = null;
        initializeCursorAndHeartbeatManager(this.time, this.sourceConfig, this.mongoClient);
    }

    private ChangeStreamIterable<Document> getChangeStreamIterable(MongoSourceConfig mongoSourceConfig, MongoClient mongoClient) {
        ChangeStreamIterable<Document> changeStreamIterable;
        String string = mongoSourceConfig.getString("database");
        String string2 = mongoSourceConfig.getString("collection");
        Optional<List<Document>> pipeline = mongoSourceConfig.getPipeline();
        if (string.isEmpty()) {
            LOGGER.info("Watching all changes on the cluster");
            Objects.requireNonNull(mongoClient);
            changeStreamIterable = (ChangeStreamIterable) pipeline.map(mongoClient::watch).orElse(mongoClient.watch());
        } else if (string2.isEmpty()) {
            LOGGER.info("Watching for database changes on '{}'", string);
            MongoDatabase database = mongoClient.getDatabase(string);
            Objects.requireNonNull(database);
            changeStreamIterable = (ChangeStreamIterable) pipeline.map(database::watch).orElse(database.watch());
        } else {
            LOGGER.info("Watching for collection changes on '{}.{}'", string, string2);
            MongoCollection<Document> collection = mongoClient.getDatabase(string).getCollection(string2);
            Objects.requireNonNull(collection);
            changeStreamIterable = (ChangeStreamIterable) pipeline.map(collection::watch).orElse(collection.watch());
        }
        int intValue = mongoSourceConfig.getInt(MongoSourceConfig.BATCH_SIZE_CONFIG).intValue();
        if (intValue > 0) {
            changeStreamIterable.batchSize2(intValue);
        }
        Optional<FullDocument> fullDocument = mongoSourceConfig.getFullDocument();
        ChangeStreamIterable<Document> changeStreamIterable2 = changeStreamIterable;
        Objects.requireNonNull(changeStreamIterable2);
        fullDocument.ifPresent(changeStreamIterable2::fullDocument);
        Optional<Collation> collation = mongoSourceConfig.getCollation();
        ChangeStreamIterable<Document> changeStreamIterable3 = changeStreamIterable;
        Objects.requireNonNull(changeStreamIterable3);
        collation.ifPresent(changeStreamIterable3::collation);
        return changeStreamIterable;
    }

    Map<String, Object> getOffset(MongoSourceConfig mongoSourceConfig) {
        if (this.context == null) {
            return null;
        }
        Map<String, Object> offset = this.context.offsetStorageReader().offset(createPartitionMap(mongoSourceConfig));
        if (offset == null && mongoSourceConfig.getString(MongoSourceConfig.OFFSET_PARTITION_NAME_CONFIG).isEmpty()) {
            offset = this.context.offsetStorageReader().offset(createLegacyPartitionMap(mongoSourceConfig));
        }
        return offset;
    }

    BsonDocument getResumeToken(MongoSourceConfig mongoSourceConfig) {
        BsonDocument bsonDocument = null;
        if (this.cachedResumeToken != null) {
            bsonDocument = this.cachedResumeToken;
            this.cachedResumeToken = null;
        } else if (this.invalidatedCursor) {
            this.invalidatedCursor = false;
        } else {
            Map<String, Object> offset = getOffset(mongoSourceConfig);
            if (offset != null && offset.containsKey("_id") && !offset.containsKey(COPY_KEY)) {
                bsonDocument = BsonDocument.parse((String) offset.get("_id"));
                if (offset.containsKey(HeartbeatManager.HEARTBEAT_KEY)) {
                    LOGGER.info("Resume token from heartbeat: {}", bsonDocument);
                }
            }
        }
        return bsonDocument;
    }
}
