package io.debezium.connector.mongodb;

import com.mongodb.client.ChangeStreamIterable;
import com.mongodb.client.MongoClient;
import com.mongodb.client.model.changestream.ChangeStreamDocument;
import com.mongodb.client.model.changestream.FullDocument;
import com.mongodb.client.model.changestream.FullDocumentBeforeChange;
import io.debezium.connector.mongodb.connection.ConnectionContext;
import io.debezium.connector.mongodb.connection.MongoDbConnection;
import io.debezium.connector.mongodb.connection.ReplicaSet;
import io.debezium.connector.mongodb.events.BufferingChangeStreamCursor;
import io.debezium.connector.mongodb.events.SplitEventHandler;
import io.debezium.connector.mongodb.metrics.MongoDbStreamingChangeEventSourceMetrics;
import io.debezium.connector.mongodb.recordemitter.MongoDbChangeRecordEmitter;
import io.debezium.function.BlockingRunnable;
import io.debezium.pipeline.ErrorHandler;
import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.source.spi.ChangeEventSource;
import io.debezium.pipeline.source.spi.StreamingChangeEventSource;
import io.debezium.pipeline.txmetadata.TransactionContext;
import io.debezium.util.Clock;
import io.debezium.util.Threads;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import org.bson.BsonDocument;
import org.bson.BsonString;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/debezium/connector/mongodb/MongoDbStreamingChangeEventSource.class */
public class MongoDbStreamingChangeEventSource implements StreamingChangeEventSource<MongoDbPartition, MongoDbOffsetContext> {
    private static final Logger LOGGER = LoggerFactory.getLogger(MongoDbStreamingChangeEventSource.class);
    private final MongoDbConnectorConfig connectorConfig;
    private final EventDispatcher<MongoDbPartition, CollectionId> dispatcher;
    private final ErrorHandler errorHandler;
    private final Clock clock;
    private final ConnectionContext connectionContext;
    private final ReplicaSets replicaSets;
    private final MongoDbTaskContext taskContext;
    private final MongoDbConnection.ChangeEventSourceConnectionFactory connections;
    private final MongoDbStreamingChangeEventSourceMetrics streamingMetrics;
    private MongoDbOffsetContext effectiveOffset;

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:io/debezium/connector/mongodb/MongoDbStreamingChangeEventSource$StreamStatus.class */
    public enum StreamStatus {
        DISPATCHED,
        NEXT,
        ERROR
    }

    public MongoDbStreamingChangeEventSource(MongoDbConnectorConfig mongoDbConnectorConfig, MongoDbTaskContext mongoDbTaskContext, MongoDbConnection.ChangeEventSourceConnectionFactory changeEventSourceConnectionFactory, ReplicaSets replicaSets, EventDispatcher<MongoDbPartition, CollectionId> eventDispatcher, ErrorHandler errorHandler, Clock clock, MongoDbStreamingChangeEventSourceMetrics mongoDbStreamingChangeEventSourceMetrics) {
        this.connectorConfig = mongoDbConnectorConfig;
        this.connectionContext = mongoDbTaskContext.getConnectionContext();
        this.dispatcher = eventDispatcher;
        this.errorHandler = errorHandler;
        this.clock = clock;
        this.replicaSets = replicaSets;
        this.taskContext = mongoDbTaskContext;
        this.connections = changeEventSourceConnectionFactory;
        this.streamingMetrics = mongoDbStreamingChangeEventSourceMetrics;
    }

    public void init(MongoDbOffsetContext mongoDbOffsetContext) {
        this.effectiveOffset = mongoDbOffsetContext == null ? emptyOffsets(this.connectorConfig) : mongoDbOffsetContext;
    }

    public void execute(ChangeEventSource.ChangeEventSourceContext changeEventSourceContext, MongoDbPartition mongoDbPartition, MongoDbOffsetContext mongoDbOffsetContext) throws InterruptedException {
        List<ReplicaSet> all = this.replicaSets.all();
        if (all.size() == 1) {
            streamChangesForReplicaSet(changeEventSourceContext, mongoDbPartition, all.get(0));
        } else if (all.size() > 1) {
            streamChangesForReplicaSets(changeEventSourceContext, mongoDbPartition, all);
        }
    }

    /* renamed from: getOffsetContext, reason: merged with bridge method [inline-methods] */
    public MongoDbOffsetContext m25getOffsetContext() {
        return this.effectiveOffset;
    }

    private void streamChangesForReplicaSet(ChangeEventSource.ChangeEventSourceContext changeEventSourceContext, MongoDbPartition mongoDbPartition, ReplicaSet replicaSet) {
        try {
            MongoDbConnection mongoDbConnection = this.connections.get(replicaSet, mongoDbPartition);
            try {
                mongoDbConnection.execute("read from change stream on '" + replicaSet + "'", mongoClient -> {
                    readChangeStream(mongoClient, replicaSet, changeEventSourceContext);
                });
                if (mongoDbConnection != null) {
                    mongoDbConnection.close();
                }
            } finally {
            }
        } catch (Throwable th) {
            LOGGER.error("Streaming for replica set {} failed", replicaSet.replicaSetName(), th);
            this.errorHandler.setProducerThrowable(th);
        }
    }

    private void streamChangesForReplicaSets(ChangeEventSource.ChangeEventSourceContext changeEventSourceContext, MongoDbPartition mongoDbPartition, List<ReplicaSet> list) {
        int size = list.size();
        ExecutorService newFixedThreadPool = Threads.newFixedThreadPool(MongoDbConnector.class, this.taskContext.serverName(), "replicator-streaming", size);
        CountDownLatch countDownLatch = new CountDownLatch(size);
        LOGGER.info("Starting {} thread(s) to stream changes for replica sets: {}", Integer.valueOf(size), list);
        list.forEach(replicaSet -> {
            newFixedThreadPool.submit(() -> {
                try {
                    streamChangesForReplicaSet(changeEventSourceContext, mongoDbPartition, replicaSet);
                    countDownLatch.countDown();
                } catch (Throwable th) {
                    countDownLatch.countDown();
                    throw th;
                }
            });
        });
        try {
            countDownLatch.await();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        newFixedThreadPool.shutdown();
    }

    private void readChangeStream(MongoClient mongoClient, ReplicaSet replicaSet, ChangeEventSource.ChangeEventSourceContext changeEventSourceContext) {
        LOGGER.info("Reading change stream for '{}'", replicaSet);
        ReplicaSetPartition replicaSetPartition = this.effectiveOffset.getReplicaSetPartition(replicaSet);
        ReplicaSetOffsetContext replicaSetOffsetContext = this.effectiveOffset.getReplicaSetOffsetContext(replicaSet);
        SplitEventHandler splitEventHandler = new SplitEventHandler();
        BufferingChangeStreamCursor start = BufferingChangeStreamCursor.fromIterable(initChangeStream(mongoClient, replicaSetOffsetContext), this.taskContext, this.streamingMetrics, this.clock).start();
        while (changeEventSourceContext.isRunning()) {
            try {
                waitWhenStreamingPaused(changeEventSourceContext);
                BufferingChangeStreamCursor.ResumableChangeStreamEvent m36tryNext = start.m36tryNext();
                if (m36tryNext != null && ((StreamStatus) m36tryNext.document.map(changeStreamDocument -> {
                    return processChangeStreamDocument(changeStreamDocument, splitEventHandler, replicaSet, replicaSetPartition, replicaSetOffsetContext);
                }).orElseGet(() -> {
                    return errorHandled(() -> {
                        dispatchHeartbeatEvent(m36tryNext, replicaSetPartition, replicaSetOffsetContext);
                    });
                })) == StreamStatus.ERROR) {
                    if (start != null) {
                        start.close();
                        return;
                    }
                    return;
                }
            } catch (Throwable th) {
                if (start != null) {
                    try {
                        start.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        }
        if (start != null) {
            start.close();
        }
    }

    private void waitWhenStreamingPaused(ChangeEventSource.ChangeEventSourceContext changeEventSourceContext) {
        if (changeEventSourceContext.isPaused()) {
            errorHandled(() -> {
                LOGGER.info("Streaming will now pause");
                changeEventSourceContext.streamingPaused();
                changeEventSourceContext.waitSnapshotCompletion();
                LOGGER.info("Streaming resumed");
            });
        }
    }

    private StreamStatus processChangeStreamDocument(ChangeStreamDocument<BsonDocument> changeStreamDocument, SplitEventHandler<BsonDocument> splitEventHandler, ReplicaSet replicaSet, ReplicaSetPartition replicaSetPartition, ReplicaSetOffsetContext replicaSetOffsetContext) {
        LOGGER.trace("Arrived Change Stream event: {}", changeStreamDocument);
        return (StreamStatus) splitEventHandler.handle(changeStreamDocument).map(changeStreamDocument2 -> {
            return errorHandled(() -> {
                dispatchChangeEvent(changeStreamDocument2, replicaSet, replicaSetPartition, replicaSetOffsetContext);
            });
        }).orElse(StreamStatus.NEXT);
    }

    private void dispatchChangeEvent(ChangeStreamDocument<BsonDocument> changeStreamDocument, ReplicaSet replicaSet, ReplicaSetPartition replicaSetPartition, ReplicaSetOffsetContext replicaSetOffsetContext) throws InterruptedException {
        CollectionId collectionId = new CollectionId(replicaSet.replicaSetName(), changeStreamDocument.getNamespace().getDatabaseName(), changeStreamDocument.getNamespace().getCollectionName());
        MongoDbChangeRecordEmitter mongoDbChangeRecordEmitter = new MongoDbChangeRecordEmitter(replicaSetPartition, replicaSetOffsetContext, this.clock, changeStreamDocument, this.connectorConfig);
        replicaSetOffsetContext.changeStreamEvent(changeStreamDocument);
        this.dispatcher.dispatchDataChangeEvent(replicaSetPartition, collectionId, mongoDbChangeRecordEmitter);
    }

    private void dispatchHeartbeatEvent(BufferingChangeStreamCursor.ResumableChangeStreamEvent<BsonDocument> resumableChangeStreamEvent, ReplicaSetPartition replicaSetPartition, ReplicaSetOffsetContext replicaSetOffsetContext) throws InterruptedException {
        LOGGER.trace("No Change Stream event arrived");
        replicaSetOffsetContext.noEvent(resumableChangeStreamEvent);
        this.dispatcher.dispatchHeartbeatEvent(replicaSetPartition, replicaSetOffsetContext);
    }

    private StreamStatus errorHandled(BlockingRunnable blockingRunnable) {
        try {
            blockingRunnable.run();
            return StreamStatus.DISPATCHED;
        } catch (InterruptedException e) {
            LOGGER.info("Replicator thread is interrupted");
            Thread.currentThread().interrupt();
            return StreamStatus.ERROR;
        } catch (Exception e2) {
            this.errorHandler.setProducerThrowable(e2);
            return StreamStatus.ERROR;
        }
    }

    protected ChangeStreamIterable<BsonDocument> initChangeStream(MongoClient mongoClient, ReplicaSetOffsetContext replicaSetOffsetContext) {
        ChangeStreamIterable<BsonDocument> openChangeStream = MongoUtil.openChangeStream(mongoClient, this.taskContext);
        if (this.taskContext.getCaptureMode().isFullUpdate()) {
            openChangeStream.fullDocument(FullDocument.UPDATE_LOOKUP);
        }
        if (this.taskContext.getCaptureMode().isIncludePreImage()) {
            openChangeStream.fullDocumentBeforeChange(FullDocumentBeforeChange.WHEN_AVAILABLE);
        }
        if (replicaSetOffsetContext.lastResumeToken() != null) {
            LOGGER.info("Resuming streaming from token '{}'", replicaSetOffsetContext.lastResumeToken());
            BsonDocument bsonDocument = new BsonDocument();
            bsonDocument.put("_data", new BsonString(replicaSetOffsetContext.lastResumeToken()));
            openChangeStream.resumeAfter(bsonDocument);
        } else if (replicaSetOffsetContext.lastTimestamp() != null) {
            LOGGER.info("Resuming streaming from operation time '{}'", replicaSetOffsetContext.lastTimestamp());
            openChangeStream.startAtOperationTime(replicaSetOffsetContext.lastTimestamp());
        }
        if (this.connectorConfig.getCursorMaxAwaitTime() > 0) {
            openChangeStream.maxAwaitTime(this.connectorConfig.getCursorMaxAwaitTime(), TimeUnit.MILLISECONDS);
        }
        return openChangeStream;
    }

    protected MongoDbOffsetContext emptyOffsets(MongoDbConnectorConfig mongoDbConnectorConfig) {
        LOGGER.info("Initializing empty Offset context");
        return new MongoDbOffsetContext(new SourceInfo(mongoDbConnectorConfig), new TransactionContext(), new MongoDbIncrementalSnapshotContext(false));
    }
}
