package io.debezium.connector.mongodb;

import com.mongodb.MongoChangeStreamException;
import com.mongodb.MongoCommandException;
import com.mongodb.client.ChangeStreamIterable;
import com.mongodb.client.MongoChangeStreamCursor;
import com.mongodb.client.MongoCursor;
import com.mongodb.client.model.changestream.ChangeStreamDocument;
import io.debezium.DebeziumException;
import io.debezium.connector.SnapshotRecord;
import io.debezium.connector.mongodb.MongoDbConnectorConfig;
import io.debezium.connector.mongodb.connection.ConnectionContext;
import io.debezium.connector.mongodb.connection.MongoDbConnection;
import io.debezium.connector.mongodb.connection.ReplicaSet;
import io.debezium.connector.mongodb.recordemitter.MongoDbSnapshotRecordEmitter;
import io.debezium.pipeline.ErrorHandler;
import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.source.AbstractSnapshotChangeEventSource;
import io.debezium.pipeline.source.spi.ChangeEventSource;
import io.debezium.pipeline.source.spi.SnapshotProgressListener;
import io.debezium.pipeline.spi.ChangeRecordEmitter;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.pipeline.spi.SnapshotResult;
import io.debezium.pipeline.txmetadata.TransactionContext;
import io.debezium.util.Clock;
import io.debezium.util.Strings;
import io.debezium.util.Threads;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.bson.BsonDocument;
import org.bson.Document;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/debezium/connector/mongodb/MongoDbSnapshotChangeEventSource.class */
public class MongoDbSnapshotChangeEventSource extends AbstractSnapshotChangeEventSource<MongoDbPartition, MongoDbOffsetContext> {
    private static final Logger LOGGER = LoggerFactory.getLogger(MongoDbSnapshotChangeEventSource.class);
    private final MongoDbConnectorConfig connectorConfig;
    private final MongoDbTaskContext taskContext;
    private final MongoDbConnection.ChangeEventSourceConnectionFactory connections;
    private final ConnectionContext connectionContext;
    private final ReplicaSets replicaSets;
    private final EventDispatcher<MongoDbPartition, CollectionId> dispatcher;
    private final Clock clock;
    private final SnapshotProgressListener<MongoDbPartition> snapshotProgressListener;
    private final ErrorHandler errorHandler;
    private AtomicBoolean aborted;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/debezium/connector/mongodb/MongoDbSnapshotChangeEventSource$MongoDbSnapshotContext.class */
    public static class MongoDbSnapshotContext extends AbstractSnapshotChangeEventSource.SnapshotContext<MongoDbPartition, MongoDbOffsetContext> {
        public boolean lastCollection;
        public boolean lastRecordInCollection;

        MongoDbSnapshotContext(MongoDbPartition mongoDbPartition) {
            super(mongoDbPartition);
        }
    }

    /* loaded from: input_file:io/debezium/connector/mongodb/MongoDbSnapshotChangeEventSource$MongoDbSnapshottingTask.class */
    public static class MongoDbSnapshottingTask extends AbstractSnapshotChangeEventSource.SnapshottingTask {
        private final List<ReplicaSet> replicaSetsToSnapshot;

        public MongoDbSnapshottingTask(List<ReplicaSet> list) {
            super(false, !list.isEmpty());
            this.replicaSetsToSnapshot = list;
        }

        public List<ReplicaSet> getReplicaSetsToSnapshot() {
            return Collections.unmodifiableList(this.replicaSetsToSnapshot);
        }

        public boolean shouldSkipSnapshot() {
            return !snapshotData();
        }

        public String toString() {
            return "SnapshottingTask [replicaSetsToSnapshot=" + this.replicaSetsToSnapshot + "]";
        }
    }

    public MongoDbSnapshotChangeEventSource(MongoDbConnectorConfig mongoDbConnectorConfig, MongoDbTaskContext mongoDbTaskContext, MongoDbConnection.ChangeEventSourceConnectionFactory changeEventSourceConnectionFactory, ReplicaSets replicaSets, EventDispatcher<MongoDbPartition, CollectionId> eventDispatcher, Clock clock, SnapshotProgressListener<MongoDbPartition> snapshotProgressListener, ErrorHandler errorHandler) {
        super(mongoDbConnectorConfig, snapshotProgressListener);
        this.aborted = new AtomicBoolean(false);
        this.connectorConfig = mongoDbConnectorConfig;
        this.taskContext = mongoDbTaskContext;
        this.connections = changeEventSourceConnectionFactory;
        this.connectionContext = mongoDbTaskContext.getConnectionContext();
        this.replicaSets = replicaSets;
        this.dispatcher = eventDispatcher;
        this.clock = clock;
        this.snapshotProgressListener = snapshotProgressListener;
        this.errorHandler = errorHandler;
    }

    protected SnapshotResult<MongoDbOffsetContext> doExecute(ChangeEventSource.ChangeEventSourceContext changeEventSourceContext, MongoDbOffsetContext mongoDbOffsetContext, AbstractSnapshotChangeEventSource.SnapshotContext<MongoDbPartition, MongoDbOffsetContext> snapshotContext, AbstractSnapshotChangeEventSource.SnapshottingTask snapshottingTask) {
        MongoDbSnapshottingTask mongoDbSnapshottingTask = (MongoDbSnapshottingTask) snapshottingTask;
        MongoDbSnapshotContext mongoDbSnapshotContext = (MongoDbSnapshotContext) snapshotContext;
        LOGGER.info("Snapshot step 1 - Preparing");
        if (mongoDbOffsetContext != null && mongoDbOffsetContext.isSnapshotRunning()) {
            LOGGER.info("Previous snapshot was cancelled before completion; a new snapshot will be taken.");
        }
        LOGGER.info("Snapshot step 2 - Determining snapshot offsets");
        List<ReplicaSet> replicaSetsToSnapshot = mongoDbSnapshottingTask.getReplicaSetsToSnapshot();
        initSnapshotStartOffsets(mongoDbSnapshotContext);
        int size = replicaSetsToSnapshot.size();
        LOGGER.info("Starting {} thread(s) to snapshot replica sets: {}", Integer.valueOf(size), replicaSetsToSnapshot);
        ExecutorService newFixedThreadPool = Threads.newFixedThreadPool(MongoDbConnector.class, this.taskContext.serverName(), "replicator-snapshot", size);
        CountDownLatch countDownLatch = new CountDownLatch(size);
        LOGGER.info("Snapshot step 3 - Snapshotting data");
        replicaSetsToSnapshot.forEach(replicaSet -> {
            newFixedThreadPool.submit(() -> {
                try {
                    try {
                        this.taskContext.configureLoggingContext(replicaSet.replicaSetName());
                        snapshotReplicaSet(changeEventSourceContext, mongoDbSnapshotContext, replicaSet);
                        countDownLatch.countDown();
                    } catch (Throwable th) {
                        LOGGER.error("Snapshot for replica set {} failed", replicaSet.replicaSetName(), th);
                        this.errorHandler.setProducerThrowable(th);
                        countDownLatch.countDown();
                    }
                } catch (Throwable th2) {
                    countDownLatch.countDown();
                    throw th2;
                }
            });
        });
        try {
            countDownLatch.await();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            this.aborted.set(true);
        }
        newFixedThreadPool.shutdown();
        return this.aborted.get() ? SnapshotResult.aborted() : SnapshotResult.completed(snapshotContext.offset);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public AbstractSnapshotChangeEventSource.SnapshottingTask getSnapshottingTask(MongoDbPartition mongoDbPartition, MongoDbOffsetContext mongoDbOffsetContext) {
        if (this.connectorConfig.getSnapshotMode().equals(MongoDbConnectorConfig.SnapshotMode.NEVER)) {
            LOGGER.info("According to the connector configuration, no snapshot will occur.");
            return new MongoDbSnapshottingTask(Collections.emptyList());
        }
        if (mongoDbOffsetContext != null) {
            return new MongoDbSnapshottingTask((List) this.replicaSets.all().stream().filter(replicaSet -> {
                return isSnapshotExpected(mongoDbPartition, replicaSet, mongoDbOffsetContext);
            }).collect(Collectors.toList()));
        }
        LOGGER.info("No previous offset has been found");
        return new MongoDbSnapshottingTask(this.replicaSets.all());
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public AbstractSnapshotChangeEventSource.SnapshotContext<MongoDbPartition, MongoDbOffsetContext> prepare(MongoDbPartition mongoDbPartition) {
        return new MongoDbSnapshotContext(mongoDbPartition);
    }

    private void snapshotReplicaSet(ChangeEventSource.ChangeEventSourceContext changeEventSourceContext, MongoDbSnapshotContext mongoDbSnapshotContext, ReplicaSet replicaSet) throws InterruptedException {
        MongoDbConnection mongoDbConnection = this.connections.get(replicaSet, (MongoDbPartition) mongoDbSnapshotContext.partition);
        try {
            createDataEvents(changeEventSourceContext, mongoDbSnapshotContext, replicaSet, mongoDbConnection);
            if (mongoDbConnection != null) {
                mongoDbConnection.close();
            }
        } catch (Throwable th) {
            if (mongoDbConnection != null) {
                try {
                    mongoDbConnection.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    private boolean isSnapshotExpected(MongoDbPartition mongoDbPartition, ReplicaSet replicaSet, MongoDbOffsetContext mongoDbOffsetContext) {
        ReplicaSetOffsetContext replicaSetOffsetContext = mongoDbOffsetContext.getReplicaSetOffsetContext(replicaSet);
        if (!replicaSetOffsetContext.hasOffset()) {
            LOGGER.info("No existing offset found for replica set '{}', starting snapshot", replicaSetOffsetContext.getReplicaSetName());
            return true;
        }
        if (replicaSetOffsetContext.isSnapshotOngoing()) {
            LOGGER.info("The previous snapshot was incomplete for '{}', so restarting the snapshot", replicaSetOffsetContext.getReplicaSetName());
            return true;
        }
        LOGGER.info("Found existing offset for replica set '{}' at {}", replicaSetOffsetContext.getReplicaSetName(), replicaSetOffsetContext.getOffset());
        return isValidResumeToken(mongoDbPartition, replicaSet, replicaSetOffsetContext.lastResumeTokenDoc());
    }

    private boolean isValidResumeToken(MongoDbPartition mongoDbPartition, ReplicaSet replicaSet, BsonDocument bsonDocument) {
        if (bsonDocument == null) {
            return false;
        }
        try {
            MongoDbConnection mongoDbConnection = this.connections.get(replicaSet, mongoDbPartition);
            try {
                boolean booleanValue = ((Boolean) mongoDbConnection.execute("Checking change stream", mongoClient -> {
                    ChangeStreamIterable watch = mongoClient.watch(BsonDocument.class);
                    watch.resumeAfter(bsonDocument);
                    try {
                        MongoChangeStreamCursor cursor = watch.cursor();
                        try {
                            LOGGER.info("Valid resume token present for replica set '{}, so no snapshot will be performed'", replicaSet.replicaSetName());
                            if (cursor != null) {
                                cursor.close();
                            }
                            return false;
                        } finally {
                        }
                    } catch (MongoCommandException | MongoChangeStreamException e) {
                        LOGGER.info("Invalid resume token present for replica set '{}, snapshot will be performed'", replicaSet.replicaSetName());
                        return true;
                    }
                })).booleanValue();
                if (mongoDbConnection != null) {
                    mongoDbConnection.close();
                }
                return booleanValue;
            } finally {
            }
        } catch (InterruptedException e) {
            throw new DebeziumException("Interrupted while creating snapshotting task", e);
        }
    }

    private void initSnapshotStartOffsets(MongoDbSnapshotContext mongoDbSnapshotContext) {
        LOGGER.info("Initializing empty Offset context");
        mongoDbSnapshotContext.offset = new MongoDbOffsetContext(new SourceInfo(this.connectorConfig), new TransactionContext(), new MongoDbIncrementalSnapshotContext(false));
    }

    private void initReplicaSetSnapshotStartOffsets(MongoDbSnapshotContext mongoDbSnapshotContext, ReplicaSet replicaSet, MongoDbConnection mongoDbConnection) throws InterruptedException {
        LOGGER.info("Determine Snapshot start offset for replica-set {}", replicaSet.replicaSetName());
        ReplicaSetOffsetContext replicaSetOffsetContext = mongoDbSnapshotContext.offset.getReplicaSetOffsetContext(replicaSet);
        mongoDbConnection.execute("Setting resume token", mongoClient -> {
            MongoChangeStreamCursor<ChangeStreamDocument<BsonDocument>> cursor = mongoClient.watch(BsonDocument.class).cursor();
            try {
                replicaSetOffsetContext.initEvent(cursor);
                if (cursor != null) {
                    cursor.close();
                }
                replicaSetOffsetContext.initFromOpTimeIfNeeded(mongoClient);
            } catch (Throwable th) {
                if (cursor != null) {
                    try {
                        cursor.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        });
    }

    private void createDataEvents(ChangeEventSource.ChangeEventSourceContext changeEventSourceContext, MongoDbSnapshotContext mongoDbSnapshotContext, ReplicaSet replicaSet, MongoDbConnection mongoDbConnection) throws InterruptedException {
        initReplicaSetSnapshotStartOffsets(mongoDbSnapshotContext, replicaSet, mongoDbConnection);
        EventDispatcher.SnapshotReceiver<MongoDbPartition> snapshotChangeEventReceiver = this.dispatcher.getSnapshotChangeEventReceiver();
        mongoDbSnapshotContext.offset.preSnapshotStart();
        createDataEventsForReplicaSet(changeEventSourceContext, mongoDbSnapshotContext, snapshotChangeEventReceiver, replicaSet, mongoDbConnection);
        mongoDbSnapshotContext.offset.preSnapshotCompletion();
        snapshotChangeEventReceiver.completeSnapshot();
        mongoDbSnapshotContext.offset.postSnapshotCompletion();
    }

    private void createDataEventsForReplicaSet(ChangeEventSource.ChangeEventSourceContext changeEventSourceContext, MongoDbSnapshotContext mongoDbSnapshotContext, EventDispatcher.SnapshotReceiver<MongoDbPartition> snapshotReceiver, ReplicaSet replicaSet, MongoDbConnection mongoDbConnection) throws InterruptedException {
        String replicaSetName = replicaSet.replicaSetName();
        MongoDbOffsetContext mongoDbOffsetContext = mongoDbSnapshotContext.offset;
        ReplicaSetOffsetContext replicaSetOffsetContext = mongoDbOffsetContext.getReplicaSetOffsetContext(replicaSet);
        mongoDbSnapshotContext.lastCollection = false;
        mongoDbOffsetContext.startReplicaSetSnapshot(replicaSet.replicaSetName());
        LOGGER.info("Beginning snapshot of '{}' at {}", replicaSetName, replicaSetOffsetContext.getOffset());
        List list = (List) determineDataCollectionsToBeSnapshotted(mongoDbConnection.collections()).collect(Collectors.toList());
        this.snapshotProgressListener.monitoredDataCollectionsDetermined((MongoDbPartition) mongoDbSnapshotContext.partition, list);
        if (this.connectorConfig.getSnapshotMaxThreads() > 1) {
            int min = Math.min(list.size(), this.connectorConfig.getSnapshotMaxThreads());
            ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue(list);
            ExecutorService newFixedThreadPool = Threads.newFixedThreadPool(MongoDbConnector.class, this.taskContext.serverName(), "snapshot-" + (replicaSet.hasReplicaSetName() ? replicaSet.replicaSetName() : "main"), this.connectorConfig.getSnapshotMaxThreads());
            CountDownLatch countDownLatch = new CountDownLatch(min);
            AtomicBoolean atomicBoolean = new AtomicBoolean(false);
            AtomicInteger atomicInteger = new AtomicInteger(0);
            LOGGER.info("Preparing to use {} thread(s) to snapshot {} collection(s): {}", new Object[]{Integer.valueOf(min), Integer.valueOf(list.size()), Strings.join(", ", list)});
            for (int i = 0; i < min; i++) {
                newFixedThreadPool.submit(() -> {
                    CollectionId collectionId;
                    this.taskContext.configureLoggingContext(replicaSet.replicaSetName() + "-snapshot" + atomicInteger.incrementAndGet());
                    while (!atomicBoolean.get() && (collectionId = (CollectionId) concurrentLinkedQueue.poll()) != null) {
                        try {
                            try {
                                if (!changeEventSourceContext.isRunning()) {
                                    throw new InterruptedException("Interrupted while snapshotting replica set " + replicaSet.replicaSetName());
                                }
                                if (concurrentLinkedQueue.isEmpty()) {
                                    mongoDbSnapshotContext.lastCollection = true;
                                }
                                createDataEventsForCollection(changeEventSourceContext, mongoDbSnapshotContext, snapshotReceiver, replicaSet, collectionId, mongoDbConnection);
                            } catch (InterruptedException e) {
                                atomicBoolean.set(true);
                                countDownLatch.countDown();
                                return;
                            }
                        } finally {
                            countDownLatch.countDown();
                        }
                    }
                });
            }
            try {
                countDownLatch.await();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                atomicBoolean.set(true);
            }
            newFixedThreadPool.shutdown();
        } else {
            Iterator it = list.iterator();
            while (it.hasNext()) {
                CollectionId collectionId = (CollectionId) it.next();
                if (!changeEventSourceContext.isRunning()) {
                    throw new InterruptedException("Interrupted while snapshotting replica set " + replicaSet.replicaSetName());
                }
                if (!it.hasNext()) {
                    mongoDbSnapshotContext.lastCollection = true;
                }
                createDataEventsForCollection(changeEventSourceContext, mongoDbSnapshotContext, snapshotReceiver, replicaSet, collectionId, mongoDbConnection);
            }
        }
        mongoDbOffsetContext.stopReplicaSetSnapshot(replicaSet.replicaSetName());
    }

    private void createDataEventsForCollection(ChangeEventSource.ChangeEventSourceContext changeEventSourceContext, MongoDbSnapshotContext mongoDbSnapshotContext, EventDispatcher.SnapshotReceiver<MongoDbPartition> snapshotReceiver, ReplicaSet replicaSet, CollectionId collectionId, MongoDbConnection mongoDbConnection) throws InterruptedException {
        long currentTimeInMillis = this.clock.currentTimeInMillis();
        LOGGER.info("\t Exporting data for collection '{}'", collectionId);
        mongoDbConnection.execute("sync '" + collectionId + "'", mongoClient -> {
            long j = 0;
            MongoCursor it = mongoClient.getDatabase(collectionId.dbName()).getCollection(collectionId.name(), BsonDocument.class).find(Document.parse(this.connectorConfig.getSnapshotFilterQueryForCollection(collectionId).orElseGet(() -> {
                return "{}";
            }))).batchSize(this.taskContext.getConnectorConfig().getSnapshotFetchSize()).iterator();
            try {
                mongoDbSnapshotContext.lastRecordInCollection = false;
                if (it.hasNext()) {
                    while (it.hasNext()) {
                        if (!changeEventSourceContext.isRunning()) {
                            throw new InterruptedException("Interrupted while snapshotting collection " + collectionId.name());
                        }
                        BsonDocument bsonDocument = (BsonDocument) it.next();
                        j++;
                        mongoDbSnapshotContext.lastRecordInCollection = !it.hasNext();
                        if (mongoDbSnapshotContext.lastCollection && mongoDbSnapshotContext.lastRecordInCollection) {
                            mongoDbSnapshotContext.offset.markSnapshotRecord(SnapshotRecord.LAST);
                        }
                        this.dispatcher.dispatchSnapshotEvent((MongoDbPartition) mongoDbSnapshotContext.partition, collectionId, getChangeRecordEmitter(mongoDbSnapshotContext, collectionId, bsonDocument, replicaSet), snapshotReceiver);
                    }
                } else if (mongoDbSnapshotContext.lastCollection) {
                    mongoDbSnapshotContext.offset.markSnapshotRecord(SnapshotRecord.LAST);
                }
                LOGGER.info("\t Finished snapshotting {} records for collection '{}'; total duration '{}'", new Object[]{Long.valueOf(j), collectionId, Strings.duration(this.clock.currentTimeInMillis() - currentTimeInMillis)});
                this.snapshotProgressListener.dataCollectionSnapshotCompleted((MongoDbPartition) mongoDbSnapshotContext.partition, collectionId, j);
                if (it != null) {
                    it.close();
                }
            } catch (Throwable th) {
                if (it != null) {
                    try {
                        it.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        });
    }

    private ChangeRecordEmitter<MongoDbPartition> getChangeRecordEmitter(AbstractSnapshotChangeEventSource.SnapshotContext<MongoDbPartition, MongoDbOffsetContext> snapshotContext, CollectionId collectionId, BsonDocument bsonDocument, ReplicaSet replicaSet) {
        MongoDbOffsetContext mongoDbOffsetContext = snapshotContext.offset;
        ReplicaSetPartition replicaSetPartition = mongoDbOffsetContext.getReplicaSetPartition(replicaSet);
        ReplicaSetOffsetContext replicaSetOffsetContext = mongoDbOffsetContext.getReplicaSetOffsetContext(replicaSet);
        replicaSetOffsetContext.readEvent(collectionId, getClock().currentTime());
        return new MongoDbSnapshotRecordEmitter(replicaSetPartition, replicaSetOffsetContext, getClock(), bsonDocument, this.connectorConfig);
    }

    private Clock getClock() {
        return this.clock;
    }

    protected /* bridge */ /* synthetic */ SnapshotResult doExecute(ChangeEventSource.ChangeEventSourceContext changeEventSourceContext, OffsetContext offsetContext, AbstractSnapshotChangeEventSource.SnapshotContext snapshotContext, AbstractSnapshotChangeEventSource.SnapshottingTask snapshottingTask) throws Exception {
        return doExecute(changeEventSourceContext, (MongoDbOffsetContext) offsetContext, (AbstractSnapshotChangeEventSource.SnapshotContext<MongoDbPartition, MongoDbOffsetContext>) snapshotContext, snapshottingTask);
    }
}
