package com.mongodb.kafka.connect.source;

import com.mongodb.MongoClientSettings;
import com.mongodb.MongoCommandException;
import com.mongodb.MongoException;
import com.mongodb.client.ChangeStreamIterable;
import com.mongodb.client.MongoChangeStreamCursor;
import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoDatabase;
import com.mongodb.client.model.Collation;
import com.mongodb.client.model.changestream.ChangeStreamDocument;
import com.mongodb.client.model.changestream.FullDocument;
import com.mongodb.client.model.changestream.FullDocumentBeforeChange;
import com.mongodb.kafka.connect.source.MongoSourceConfig;
import com.mongodb.kafka.connect.source.heartbeat.HeartbeatManager;
import com.mongodb.kafka.connect.source.producer.SchemaAndValueProducer;
import com.mongodb.kafka.connect.source.producer.SchemaAndValueProducers;
import com.mongodb.kafka.connect.source.statistics.StatisticsManager;
import com.mongodb.kafka.connect.source.topic.mapping.TopicMapper;
import com.mongodb.kafka.connect.util.Assertions;
import com.mongodb.kafka.connect.util.jmx.SourceTaskStatistics;
import com.mongodb.kafka.connect.util.time.InnerOuterTimer;
import com.mongodb.lang.Nullable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.errors.DataException;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTaskContext;
import org.bson.BsonDocument;
import org.bson.BsonDocumentWrapper;
import org.bson.BsonTimestamp;
import org.bson.Document;
import org.bson.RawBsonDocument;

/* loaded from: input_file:com/mongodb/kafka/connect/source/StartedMongoSourceTask.class */
final class StartedMongoSourceTask implements AutoCloseable {
    private static final String FULL_DOCUMENT = "fullDocument";
    private static final int NAMESPACE_NOT_FOUND_ERROR = 26;
    private static final int ILLEGAL_OPERATION_ERROR = 20;
    private static final int UNKNOWN_FIELD_ERROR = 40415;
    private static final String RESUME_TOKEN = "resume token";
    private static final String RESUME_POINT = "resume point";
    private static final String NOT_FOUND = "not found";
    private static final String DOES_NOT_EXIST = "does not exist";
    private static final String INVALID_RESUME_TOKEN = "invalid resume token";
    private static final String NO_LONGER_IN_THE_OPLOG = "no longer be in the oplog";
    private final Supplier<SourceTaskContext> sourceTaskContextAccessor;
    private final Time time;
    private boolean isCopying;
    private final MongoSourceConfig sourceConfig;
    private final Map<String, Object> partitionMap;
    private final MongoClient mongoClient;
    private HeartbeatManager heartbeatManager;

    @Nullable
    private final MongoCopyDataManager copyDataManager;
    private BsonDocument cachedResult;
    private BsonDocument cachedResumeToken;

    @Nullable
    private MongoChangeStreamCursor<? extends BsonDocument> cursor;
    private final StatisticsManager statisticsManager;
    private final InnerOuterTimer inTaskPollInConnectFrameworkTimer;
    private static final int INVALIDATED_RESUME_TOKEN_ERROR = 260;
    private static final int CHANGE_STREAM_FATAL_ERROR = 280;
    private static final int CHANGE_STREAM_HISTORY_LOST = 286;
    private static final int BSON_OBJECT_TOO_LARGE = 10334;
    private static final Set<Integer> INVALID_CHANGE_STREAM_ERRORS = new HashSet(Arrays.asList(Integer.valueOf(INVALIDATED_RESUME_TOKEN_ERROR), Integer.valueOf(CHANGE_STREAM_FATAL_ERROR), Integer.valueOf(CHANGE_STREAM_HISTORY_LOST), Integer.valueOf(BSON_OBJECT_TOO_LARGE)));
    private static final SchemaAndValueProducer TOMBSTONE_SCHEMA_AND_VALUE_PRODUCER = bsonDocument -> {
        return SchemaAndValue.NULL;
    };
    private boolean supportsStartAfter = true;
    private boolean invalidatedCursor = false;
    private volatile boolean isRunning = true;

    /* JADX INFO: Access modifiers changed from: package-private */
    public StartedMongoSourceTask(Supplier<SourceTaskContext> supplier, MongoSourceConfig mongoSourceConfig, MongoClient mongoClient, @Nullable MongoCopyDataManager mongoCopyDataManager, StatisticsManager statisticsManager) {
        this.sourceTaskContextAccessor = supplier;
        this.sourceConfig = mongoSourceConfig;
        this.mongoClient = mongoClient;
        boolean z = mongoCopyDataManager != null;
        if (z) {
            Assertions.assertTrue(mongoSourceConfig.getStartupConfig().startupMode() == MongoSourceConfig.StartupConfig.StartupMode.COPY_EXISTING);
        }
        this.isCopying = z;
        this.time = new SystemTime();
        this.partitionMap = MongoSourceTask.createPartitionMap(mongoSourceConfig);
        this.copyDataManager = mongoCopyDataManager;
        if (z) {
            setCachedResultAndResumeToken();
        } else {
            initializeCursorAndHeartbeatManager();
        }
        this.statisticsManager = statisticsManager;
        this.inTaskPollInConnectFrameworkTimer = InnerOuterTimer.start(duration -> {
            SourceTaskStatistics currentStatistics = statisticsManager.currentStatistics();
            currentStatistics.getInTaskPoll().sample(duration.toMillis());
            if (MongoSourceTask.LOGGER.isDebugEnabled()) {
                MongoSourceTask.LOGGER.debug(currentStatistics.getName() + ": " + currentStatistics.toJSON());
            }
        }, duration2 -> {
            statisticsManager.currentStatistics().getInConnectFramework().sample(duration2.toMillis());
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public List<SourceRecord> poll() {
        if (!this.isCopying) {
            this.statisticsManager.switchToStreamStatistics();
        }
        InnerOuterTimer.InnerTimer sampleOuter = this.inTaskPollInConnectFrameworkTimer.sampleOuter();
        try {
            List<SourceRecord> pollInternal = pollInternal();
            if (pollInternal != null) {
                this.statisticsManager.currentStatistics().getRecords().sample(pollInternal.size());
            }
            if (sampleOuter != null) {
                sampleOuter.close();
            }
            return pollInternal;
        } catch (Throwable th) {
            if (sampleOuter != null) {
                try {
                    sampleOuter.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Nullable
    private List<SourceRecord> pollInternal() {
        TopicMapper topicMapper = this.sourceConfig.getTopicMapper();
        boolean booleanValue = this.sourceConfig.getBoolean(MongoSourceConfig.PUBLISH_FULL_DOCUMENT_ONLY_CONFIG).booleanValue();
        boolean booleanValue2 = booleanValue ? this.sourceConfig.getBoolean(MongoSourceConfig.PUBLISH_FULL_DOCUMENT_ONLY_TOMBSTONE_ON_DELETE_CONFIG).booleanValue() : false;
        SchemaAndValueProducer createKeySchemaAndValueProvider = SchemaAndValueProducers.createKeySchemaAndValueProvider(this.sourceConfig);
        SchemaAndValueProducer createValueSchemaAndValueProvider = SchemaAndValueProducers.createValueSchemaAndValueProvider(this.sourceConfig);
        ArrayList arrayList = new ArrayList();
        Iterator<BsonDocument> it = getNextBatch().iterator();
        while (it.hasNext()) {
            BsonDocument next = it.next();
            HashMap hashMap = new HashMap();
            hashMap.put("_id", next.getDocument("_id").toJson());
            if (this.isCopying) {
                hashMap.put("copy", "true");
            }
            boolean z = !it.hasNext();
            boolean z2 = (this.copyDataManager == null || this.copyDataManager.isCopying()) ? false : true;
            if (this.isCopying && z && z2 && this.cachedResumeToken != null) {
                hashMap.put("_id", this.cachedResumeToken.toJson());
                hashMap.remove("copy");
            }
            String topic = topicMapper.getTopic(next);
            if (topic.isEmpty()) {
                MongoSourceTask.LOGGER.warn("No topic set. Could not publish the message: {}", next.toJson());
            } else {
                Optional empty = Optional.empty();
                boolean z3 = booleanValue2 && !next.containsKey(FULL_DOCUMENT);
                if (!booleanValue) {
                    empty = Optional.of(next);
                } else if (next.containsKey(FULL_DOCUMENT) && next.get(FULL_DOCUMENT).isDocument()) {
                    empty = Optional.of(next.getDocument(FULL_DOCUMENT));
                }
                if (empty.isPresent() || z3) {
                    RawBsonDocument rawBsonDocument = (BsonDocument) empty.orElse(new BsonDocument());
                    MongoSourceTask.LOGGER.trace("Adding {} to {}: {}", new Object[]{rawBsonDocument, topic, hashMap});
                    if (rawBsonDocument instanceof RawBsonDocument) {
                        this.statisticsManager.currentStatistics().getMongodbBytesRead().sample(rawBsonDocument.getByteBuffer().limit());
                    }
                    Optional<SourceRecord> createSourceRecord = createSourceRecord(createKeySchemaAndValueProvider, z3 ? TOMBSTONE_SCHEMA_AND_VALUE_PRODUCER : createValueSchemaAndValueProvider, hashMap, topic, this.sourceConfig.getKeyOutputFormat() == MongoSourceConfig.OutputFormat.SCHEMA ? next : (this.sourceConfig.getBoolean(MongoSourceConfig.DOCUMENT_KEY_AS_KEY_CONFIG).booleanValue() && next.containsKey(MongoSourceTask.DOCUMENT_KEY_FIELD)) ? next.getDocument(MongoSourceTask.DOCUMENT_KEY_FIELD) : new BsonDocument("_id", next.get("_id")), rawBsonDocument);
                    Objects.requireNonNull(arrayList);
                    createSourceRecord.map((v1) -> {
                        return r1.add(v1);
                    });
                }
            }
        }
        MongoSourceTask.LOGGER.debug("Return batch of {}", Integer.valueOf(arrayList.size()));
        if (!arrayList.isEmpty()) {
            return arrayList;
        }
        if (this.heartbeatManager != null) {
            Optional<SourceRecord> heartbeat = this.heartbeatManager.heartbeat();
            if (heartbeat.isPresent()) {
                MongoSourceTask.LOGGER.debug("Returning single heartbeat record");
                return Collections.singletonList(heartbeat.get());
            }
        }
        MongoSourceTask.LOGGER.debug("Returning null because there are no source records and no heartbeat.");
        return null;
    }

    private Optional<SourceRecord> createSourceRecord(SchemaAndValueProducer schemaAndValueProducer, SchemaAndValueProducer schemaAndValueProducer2, Map<String, String> map, String str, @Nullable BsonDocument bsonDocument, @Nullable BsonDocument bsonDocument2) {
        try {
            SchemaAndValue schemaAndValue = schemaAndValueProducer.get(bsonDocument);
            SchemaAndValue schemaAndValue2 = schemaAndValueProducer2.get(bsonDocument2);
            return Optional.of(new SourceRecord(this.partitionMap, map, str, schemaAndValue.schema(), schemaAndValue.value(), schemaAndValue2.schema(), schemaAndValue2.value()));
        } catch (Exception e) {
            Supplier supplier = () -> {
                Object[] objArr = new Object[3];
                objArr[0] = e.getMessage();
                objArr[1] = bsonDocument == null ? "" : bsonDocument.toJson();
                objArr[2] = bsonDocument2 == null ? "" : bsonDocument2.toJson();
                return String.format("%s : Exception creating Source record for: Key=%s Value=%s", objArr);
            };
            if (this.sourceConfig.logErrors()) {
                MongoSourceTask.LOGGER.error((String) supplier.get(), e);
            }
            if (!this.sourceConfig.tolerateErrors()) {
                throw new DataException((String) supplier.get(), e);
            }
            if (this.sourceConfig.getDlqTopic().isEmpty()) {
                return Optional.empty();
            }
            return Optional.of(new SourceRecord(this.partitionMap, map, this.sourceConfig.getDlqTopic(), Schema.STRING_SCHEMA, bsonDocument == null ? "" : bsonDocument.toJson(), Schema.STRING_SCHEMA, bsonDocument2 == null ? "" : bsonDocument2.toJson()));
        }
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        MongoSourceTask.LOGGER.info("Stopping MongoDB source task");
        this.isRunning = false;
        StatisticsManager statisticsManager = this.statisticsManager;
        try {
            MongoClient mongoClient = this.mongoClient;
            try {
                MongoChangeStreamCursor<? extends BsonDocument> mongoChangeStreamCursor = this.cursor;
                try {
                    MongoCopyDataManager mongoCopyDataManager = this.copyDataManager;
                    if (mongoCopyDataManager != null) {
                        mongoCopyDataManager.close();
                    }
                    if (mongoChangeStreamCursor != null) {
                        mongoChangeStreamCursor.close();
                    }
                    if (mongoClient != null) {
                        mongoClient.close();
                    }
                    if (statisticsManager != null) {
                        statisticsManager.close();
                    }
                } catch (Throwable th) {
                    if (mongoChangeStreamCursor != null) {
                        try {
                            mongoChangeStreamCursor.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    }
                    throw th;
                }
            } catch (Throwable th3) {
                if (mongoClient != null) {
                    try {
                        mongoClient.close();
                    } catch (Throwable th4) {
                        th3.addSuppressed(th4);
                    }
                }
                throw th3;
            }
        } catch (Throwable th5) {
            if (statisticsManager != null) {
                try {
                    statisticsManager.close();
                } catch (Throwable th6) {
                    th5.addSuppressed(th6);
                }
            }
            throw th5;
        }
    }

    private void initializeCursorAndHeartbeatManager() {
        this.cursor = createCursor(this.sourceConfig, this.mongoClient);
        this.heartbeatManager = new HeartbeatManager(this.time, this.cursor, this.sourceConfig.getLong(MongoSourceConfig.HEARTBEAT_INTERVAL_MS_CONFIG).longValue(), this.sourceConfig.getString(MongoSourceConfig.HEARTBEAT_TOPIC_NAME_CONFIG), this.partitionMap);
    }

    @Nullable
    MongoChangeStreamCursor<? extends BsonDocument> createCursor(MongoSourceConfig mongoSourceConfig, MongoClient mongoClient) {
        MongoSourceTask.LOGGER.debug("Creating a MongoCursor");
        return tryCreateCursor(mongoSourceConfig, mongoClient, getResumeToken(mongoSourceConfig));
    }

    @Nullable
    private MongoChangeStreamCursor<? extends BsonDocument> tryRecreateCursor(MongoException mongoException) {
        MongoSourceTask.LOGGER.warn("Failed to resume change stream: {} {}\n===================================================================================\nWhen the resume token is no longer available there is the potential for data loss.\n\nRestarting the change stream with no resume token because `errors.tolerance=all`.\n===================================================================================\n", mongoException instanceof MongoCommandException ? ((MongoCommandException) mongoException).getErrorMessage() : mongoException.getMessage(), Integer.valueOf(mongoException instanceof MongoCommandException ? ((MongoCommandException) mongoException).getErrorCode() : mongoException.getCode()));
        this.invalidatedCursor = true;
        return tryCreateCursor(this.sourceConfig, this.mongoClient, null);
    }

    @Nullable
    private MongoChangeStreamCursor<? extends BsonDocument> tryCreateCursor(MongoSourceConfig mongoSourceConfig, MongoClient mongoClient, BsonDocument bsonDocument) {
        try {
            ChangeStreamIterable<Document> changeStreamIterable = getChangeStreamIterable(mongoSourceConfig, mongoClient);
            if (bsonDocument != null && this.supportsStartAfter) {
                MongoSourceTask.LOGGER.info("Resuming the change stream after the previous offset: {}", bsonDocument);
                changeStreamIterable.startAfter(bsonDocument);
            } else if (bsonDocument == null || this.invalidatedCursor) {
                MongoSourceConfig.StartupConfig startupConfig = mongoSourceConfig.getStartupConfig();
                if (startupConfig.startupMode() == MongoSourceConfig.StartupConfig.StartupMode.TIMESTAMP) {
                    Optional<BsonTimestamp> startAtOperationTime = startupConfig.timestampConfig().startAtOperationTime();
                    if (startAtOperationTime.isPresent()) {
                        MongoSourceTask.LOGGER.info("New change stream cursor created without offset but at the configured operation time.");
                        changeStreamIterable.startAtOperationTime(startAtOperationTime.get());
                    } else {
                        MongoSourceTask.LOGGER.info("New change stream cursor created without offset.");
                    }
                } else {
                    MongoSourceTask.LOGGER.info("New change stream cursor created without offset.");
                }
            } else {
                MongoSourceTask.LOGGER.info("Resuming the change stream after the previous offset using resumeAfter: {}", bsonDocument);
                changeStreamIterable.resumeAfter(bsonDocument);
            }
            return changeStreamIterable.withDocumentClass(RawBsonDocument.class).cursor();
        } catch (MongoCommandException e) {
            if (bsonDocument != null) {
                if (invalidatedResumeToken(e)) {
                    this.invalidatedCursor = true;
                    return tryCreateCursor(mongoSourceConfig, mongoClient, null);
                }
                if (MongoSourceTask.doesNotSupportsStartAfter(e)) {
                    this.supportsStartAfter = false;
                    return tryCreateCursor(mongoSourceConfig, mongoClient, bsonDocument);
                }
                if (mongoSourceConfig.tolerateErrors() && changeStreamNotValid(e)) {
                    return tryRecreateCursor(e);
                }
            }
            if (e.getErrorCode() == NAMESPACE_NOT_FOUND_ERROR) {
                MongoSourceTask.LOGGER.info("Namespace not found cursor closed.");
                return null;
            }
            if (e.getErrorCode() == ILLEGAL_OPERATION_ERROR) {
                MongoSourceTask.LOGGER.error("Illegal $changeStream operation: {} {}\n\n=====================================================================================\n{}\n\nPlease Note: Not all aggregation pipeline operations are suitable for modifying the\nchange stream output. For more information, please see the official documentation:\n   https://docs.mongodb.com/manual/changeStreams/\n=====================================================================================\n", new Object[]{e.getErrorMessage(), Integer.valueOf(e.getErrorCode()), e.getErrorMessage()});
                throw new ConnectException("Illegal $changeStream operation", e);
            }
            if (e.getErrorCode() == UNKNOWN_FIELD_ERROR) {
                String format = String.format("Invalid operation: %s %s. It is likely that you are trying to use functionality unsupported by your version of MongoDB.", e.getErrorMessage(), Integer.valueOf(e.getErrorCode()));
                MongoSourceTask.LOGGER.error(format);
                throw new ConnectException(format, e);
            }
            MongoSourceTask.LOGGER.warn("Failed to resume change stream: {} {}\n\n=====================================================================================\nIf the resume token is no longer available then there is the potential for data loss.\nSaved resume tokens are managed by Kafka and stored with the offset data.\n\nTo restart the change stream with no resume token either: \n  * Create a new partition name using the `offset.partition.name` configuration.\n  * Set `errors.tolerance=all` and ignore the erroring resume token. \n  * Manually remove the old offset from its configured storage.\n\nResetting the offset will allow for the connector to be resume from the latest resume\ntoken. Using `startup.mode = copy_existing` ensures that all data will be outputted by the\nconnector but it will duplicate existing data.\n=====================================================================================\n", e.getErrorMessage(), Integer.valueOf(e.getErrorCode()));
            if (changeStreamNotValid(e)) {
                throw new ConnectException("ResumeToken not found. Cannot create a change stream cursor", e);
            }
            return null;
        }
    }

    private static boolean invalidatedResumeToken(MongoCommandException mongoCommandException) {
        return mongoCommandException.getErrorCode() == INVALIDATED_RESUME_TOKEN_ERROR;
    }

    private static boolean changeStreamNotValid(MongoException mongoException) {
        if (INVALID_CHANGE_STREAM_ERRORS.contains(Integer.valueOf(mongoException.getCode()))) {
            return true;
        }
        String lowerCase = mongoException instanceof MongoCommandException ? ((MongoCommandException) mongoException).getErrorMessage().toLowerCase(Locale.ROOT) : mongoException.getMessage().toLowerCase(Locale.ROOT);
        return (lowerCase.contains(RESUME_TOKEN) || lowerCase.contains(RESUME_POINT)) && (lowerCase.contains(NOT_FOUND) || lowerCase.contains(DOES_NOT_EXIST) || lowerCase.contains(INVALID_RESUME_TOKEN) || lowerCase.contains(NO_LONGER_IN_THE_OPLOG));
    }

    private void setCachedResultAndResumeToken() {
        try {
            MongoChangeStreamCursor cursor = getChangeStreamIterable(this.sourceConfig, this.mongoClient).cursor();
            ChangeStreamDocument changeStreamDocument = (ChangeStreamDocument) cursor.tryNext();
            if (changeStreamDocument != null) {
                this.cachedResult = new BsonDocumentWrapper(changeStreamDocument, ChangeStreamDocument.createCodec(Document.class, MongoClientSettings.getDefaultCodecRegistry()));
            }
            this.cachedResumeToken = changeStreamDocument != null ? changeStreamDocument.getResumeToken() : cursor.getResumeToken();
            cursor.close();
        } catch (MongoCommandException e) {
            if (e.getErrorCode() != NAMESPACE_NOT_FOUND_ERROR) {
                throw new ConnectException(e);
            }
        }
    }

    private List<BsonDocument> getNextBatch() {
        ArrayList arrayList = new ArrayList();
        long intValue = this.sourceConfig.getInt(MongoSourceConfig.POLL_MAX_BATCH_SIZE_CONFIG).intValue();
        if (this.isCopying) {
            Assertions.assertNotNull(this.copyDataManager);
            if (!this.copyDataManager.isCopying()) {
                this.isCopying = false;
                MongoSourceTask.LOGGER.info("Finished copying existing data from the collection(s).");
                if (this.cachedResult != null) {
                    arrayList.add(this.cachedResult);
                    this.cachedResult = null;
                }
            }
            do {
                Optional<BsonDocument> poll = this.copyDataManager.poll();
                Objects.requireNonNull(arrayList);
                poll.ifPresent((v1) -> {
                    r1.add(v1);
                });
                if (!poll.isPresent()) {
                    break;
                }
            } while (arrayList.size() < intValue);
            return arrayList;
        }
        if (this.cursor == null) {
            initializeCursorAndHeartbeatManager();
        } else if (this.cursor.getServerCursor() == null) {
            MongoSourceTask.LOGGER.info("Cursor has been closed by the server - reinitializing");
            invalidateCursorAndReinitialize();
        }
        if (this.cursor == null) {
            MongoSourceTask.LOGGER.info("Unable to recreate the cursor");
            return arrayList;
        }
        do {
            try {
                BsonDocument bsonDocument = (BsonDocument) this.cursor.tryNext();
                if (bsonDocument != null) {
                    arrayList.add(bsonDocument);
                }
                if (bsonDocument == null || arrayList.size() >= intValue) {
                    break;
                }
            } catch (Exception e) {
                closeCursor();
                if (this.isRunning) {
                    throw new ConnectException("Unexpected error: " + e.getMessage(), e);
                }
            } catch (MongoException e2) {
                closeCursor();
                if (this.isRunning) {
                    if (!this.sourceConfig.tolerateErrors()) {
                        throw new ConnectException("An exception occurred when trying to get the next item from the Change Stream: " + e2.getMessage(), e2);
                    }
                    if (changeStreamNotValid(e2)) {
                        this.cursor = tryRecreateCursor(e2);
                    } else {
                        MongoSourceTask.LOGGER.error("An exception occurred when trying to get the next item from the Change Stream", e2);
                    }
                }
            }
        } while (this.cursor.available() > 0);
        return arrayList;
    }

    private void closeCursor() {
        if (this.cursor != null) {
            try {
                this.cursor.close();
            } catch (Exception e) {
            }
            this.cursor = null;
        }
    }

    private void invalidateCursorAndReinitialize() {
        this.invalidatedCursor = true;
        if (this.cursor != null) {
            this.cursor.close();
            this.cursor = null;
        }
        initializeCursorAndHeartbeatManager();
    }

    private static ChangeStreamIterable<Document> getChangeStreamIterable(MongoSourceConfig mongoSourceConfig, MongoClient mongoClient) {
        ChangeStreamIterable<Document> changeStreamIterable;
        String string = mongoSourceConfig.getString("database");
        String string2 = mongoSourceConfig.getString("collection");
        Optional<List<Document>> pipeline = mongoSourceConfig.getPipeline();
        if (string.isEmpty()) {
            MongoSourceTask.LOGGER.info("Watching all changes on the cluster");
            Objects.requireNonNull(mongoClient);
            changeStreamIterable = (ChangeStreamIterable) pipeline.map(mongoClient::watch).orElse(mongoClient.watch());
        } else if (string2.isEmpty()) {
            MongoSourceTask.LOGGER.info("Watching for database changes on '{}'", string);
            MongoDatabase database = mongoClient.getDatabase(string);
            Objects.requireNonNull(database);
            changeStreamIterable = (ChangeStreamIterable) pipeline.map(database::watch).orElse(database.watch());
        } else {
            MongoSourceTask.LOGGER.info("Watching for collection changes on '{}.{}'", string, string2);
            MongoCollection collection = mongoClient.getDatabase(string).getCollection(string2);
            Objects.requireNonNull(collection);
            changeStreamIterable = (ChangeStreamIterable) pipeline.map(collection::watch).orElse(collection.watch());
        }
        changeStreamIterable.maxAwaitTime(mongoSourceConfig.getLong(MongoSourceConfig.POLL_AWAIT_TIME_MS_CONFIG).longValue(), TimeUnit.MILLISECONDS);
        int intValue = mongoSourceConfig.getInt(MongoSourceConfig.BATCH_SIZE_CONFIG).intValue();
        if (intValue > 0) {
            changeStreamIterable.batchSize(intValue);
        }
        Optional<FullDocumentBeforeChange> fullDocumentBeforeChange = mongoSourceConfig.getFullDocumentBeforeChange();
        ChangeStreamIterable<Document> changeStreamIterable2 = changeStreamIterable;
        Objects.requireNonNull(changeStreamIterable2);
        fullDocumentBeforeChange.ifPresent(changeStreamIterable2::fullDocumentBeforeChange);
        Optional<FullDocument> fullDocument = mongoSourceConfig.getFullDocument();
        ChangeStreamIterable<Document> changeStreamIterable3 = changeStreamIterable;
        Objects.requireNonNull(changeStreamIterable3);
        fullDocument.ifPresent(changeStreamIterable3::fullDocument);
        Optional<Collation> collation = mongoSourceConfig.getCollation();
        ChangeStreamIterable<Document> changeStreamIterable4 = changeStreamIterable;
        Objects.requireNonNull(changeStreamIterable4);
        collation.ifPresent(changeStreamIterable4::collation);
        return changeStreamIterable;
    }

    private BsonDocument getResumeToken(MongoSourceConfig mongoSourceConfig) {
        BsonDocument bsonDocument = null;
        if (this.cachedResumeToken != null) {
            bsonDocument = this.cachedResumeToken;
            this.cachedResumeToken = null;
        } else if (this.invalidatedCursor) {
            this.invalidatedCursor = false;
        } else {
            Map<String, Object> offset = MongoSourceTask.getOffset(this.sourceTaskContextAccessor.get(), mongoSourceConfig);
            if (offset != null && offset.containsKey("_id") && !offset.containsKey("copy")) {
                bsonDocument = BsonDocument.parse((String) offset.get("_id"));
                if (offset.containsKey(HeartbeatManager.HEARTBEAT_KEY)) {
                    MongoSourceTask.LOGGER.info("Resume token from heartbeat: {}", bsonDocument);
                }
            }
        }
        return bsonDocument;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void commitRecord(SourceRecord sourceRecord, RecordMetadata recordMetadata) {
        if (recordMetadata == null) {
            this.statisticsManager.currentStatistics().getRecordsFiltered().sample(1L);
        } else {
            this.statisticsManager.currentStatistics().getRecordsAcknowledged().sample(1L);
        }
    }
}
