package io.debezium.connector.mongodb;

import com.mongodb.CursorType;
import com.mongodb.MongoClient;
import com.mongodb.ServerAddress;
import com.mongodb.client.FindIterable;
import com.mongodb.client.MongoCursor;
import io.debezium.annotation.ThreadSafe;
import io.debezium.config.CommonConnectorConfig;
import io.debezium.config.ConfigurationDefaults;
import io.debezium.connector.mongodb.ConnectionContext;
import io.debezium.connector.mongodb.RecordMakers;
import io.debezium.function.BlockingConsumer;
import io.debezium.function.BufferedBlockingConsumer;
import io.debezium.util.Clock;
import io.debezium.util.Metronome;
import io.debezium.util.Strings;
import io.debezium.util.Threads;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.source.SourceRecord;
import org.bson.BsonTimestamp;
import org.bson.Document;
import org.bson.conversions.Bson;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ThreadSafe
/* loaded from: input_file:io/debezium/connector/mongodb/Replicator.class */
public class Replicator {
    private static final String AUTHORIZATION_FAILURE_MESSAGE = "Command failed with error 13";
    private final MongoDbTaskContext context;
    private final ExecutorService copyThreads;
    private final ReplicaSet replicaSet;
    private final String rsName;
    private final SourceInfo source;
    private final RecordMakers recordMakers;
    private final BufferableRecorder bufferedRecorder;
    private final Clock clock;
    private ConnectionContext.MongoPrimary primaryClient;
    private final Consumer<Throwable> onFailure;
    static final /* synthetic */ boolean $assertionsDisabled;
    private final Logger logger = LoggerFactory.getLogger(getClass());
    private final AtomicBoolean running = new AtomicBoolean();

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:io/debezium/connector/mongodb/Replicator$BufferableRecorder.class */
    public final class BufferableRecorder implements BlockingConsumer<SourceRecord> {
        private final BlockingConsumer<SourceRecord> actual;
        private BufferedBlockingConsumer<SourceRecord> buffered;
        private volatile BlockingConsumer<SourceRecord> current;
        static final /* synthetic */ boolean $assertionsDisabled;

        public BufferableRecorder(BlockingConsumer<SourceRecord> blockingConsumer) {
            this.actual = blockingConsumer;
            this.current = this.actual;
        }

        protected synchronized void startBuffering() throws InterruptedException {
            this.buffered = BufferedBlockingConsumer.bufferLast(this.actual);
            this.current = this.buffered;
        }

        protected synchronized void stopBuffering(Map<String, ?> map) throws InterruptedException {
            if (!$assertionsDisabled && map == null) {
                throw new AssertionError();
            }
            this.buffered.close(sourceRecord -> {
                if (sourceRecord == null) {
                    return null;
                }
                return new SourceRecord(sourceRecord.sourcePartition(), map, sourceRecord.topic(), sourceRecord.kafkaPartition(), sourceRecord.keySchema(), sourceRecord.key(), sourceRecord.valueSchema(), sourceRecord.value());
            });
            this.current = this.actual;
        }

        public void accept(SourceRecord sourceRecord) throws InterruptedException {
            this.current.accept(sourceRecord);
        }

        static {
            $assertionsDisabled = !Replicator.class.desiredAssertionStatus();
        }
    }

    public Replicator(MongoDbTaskContext mongoDbTaskContext, ReplicaSet replicaSet, BlockingConsumer<SourceRecord> blockingConsumer, Consumer<Throwable> consumer) {
        if (!$assertionsDisabled && mongoDbTaskContext == null) {
            throw new AssertionError();
        }
        if (!$assertionsDisabled && replicaSet == null) {
            throw new AssertionError();
        }
        if (!$assertionsDisabled && blockingConsumer == null) {
            throw new AssertionError();
        }
        this.context = mongoDbTaskContext;
        this.source = mongoDbTaskContext.source();
        this.replicaSet = replicaSet;
        this.rsName = replicaSet.replicaSetName();
        this.copyThreads = Threads.newFixedThreadPool(MongoDbConnector.class, mongoDbTaskContext.serverName(), "copy-" + (replicaSet.hasReplicaSetName() ? replicaSet.replicaSetName() : "main"), mongoDbTaskContext.getConnectionContext().maxNumberOfCopyThreads());
        this.bufferedRecorder = new BufferableRecorder(blockingConsumer);
        this.recordMakers = new RecordMakers(mongoDbTaskContext.filters(), this.source, mongoDbTaskContext.topicSelector(), this.bufferedRecorder, mongoDbTaskContext.isEmitTombstoneOnDelete());
        this.clock = this.context.getClock();
        this.onFailure = consumer;
    }

    public void stop() {
        this.copyThreads.shutdownNow();
        this.running.set(false);
    }

    public void run() {
        try {
            if (this.running.compareAndSet(false, true)) {
                try {
                    if (establishConnectionToPrimary()) {
                        if (isInitialSyncExpected()) {
                            recordCurrentOplogPosition();
                            if (!performInitialSync()) {
                                if (this.primaryClient != null) {
                                    this.primaryClient.stop();
                                }
                                this.running.set(false);
                                return;
                            }
                        }
                        readOplog();
                    }
                    if (this.primaryClient != null) {
                        this.primaryClient.stop();
                    }
                    this.running.set(false);
                } catch (Throwable th) {
                    this.logger.error("Replicator for replica set {} failed", this.rsName, th);
                    this.onFailure.accept(th);
                    if (this.primaryClient != null) {
                        this.primaryClient.stop();
                    }
                    this.running.set(false);
                }
            }
        } catch (Throwable th2) {
            if (this.primaryClient != null) {
                this.primaryClient.stop();
            }
            this.running.set(false);
            throw th2;
        }
    }

    protected boolean establishConnectionToPrimary() {
        this.logger.info("Connecting to '{}'", this.replicaSet);
        this.primaryClient = this.context.getConnectionContext().primaryFor(this.replicaSet, this.context.filters(), (str, th) -> {
            if (th.getMessage() != null && th.getMessage().startsWith(AUTHORIZATION_FAILURE_MESSAGE)) {
                throw new ConnectException("Error while attempting to " + str, th);
            }
            this.logger.error("Error while attempting to {}: {}", new Object[]{str, th.getMessage(), th});
        });
        return this.primaryClient != null;
    }

    protected void recordCurrentOplogPosition() {
        this.primaryClient.execute("get oplog position", mongoClient -> {
            this.source.offsetStructForEvent(this.replicaSet.replicaSetName(), (Document) mongoClient.getDatabase("local").getCollection("oplog.rs").find().sort(new Document("$natural", -1)).limit(1).first());
        });
    }

    protected boolean isInitialSyncExpected() {
        boolean z;
        if (this.source.hasOffset(this.rsName)) {
            this.logger.info("Found existing offset for replica set '{}' at {}", this.rsName, this.source.lastOffset(this.rsName));
            z = false;
            if (this.context.getConnectionContext().performSnapshotEvenIfNotNeeded()) {
                this.logger.info("Configured to performing initial sync of replica set '{}'", this.rsName);
                z = true;
            } else if (this.source.isInitialSyncOngoing(this.rsName)) {
                this.logger.info("The previous initial sync was incomplete for '{}', so initiating another initial sync", this.rsName);
                z = true;
            } else {
                BsonTimestamp lastOffsetTimestamp = this.source.lastOffsetTimestamp(this.rsName);
                BsonTimestamp bsonTimestamp = (BsonTimestamp) this.primaryClient.execute("get oplog position", mongoClient -> {
                    return SourceInfo.extractEventTimestamp((Document) mongoClient.getDatabase("local").getCollection("oplog.rs").find().sort(new Document("$natural", 1)).limit(1).first());
                });
                if (bsonTimestamp == null) {
                    this.logger.info("The oplog contains no entries, so performing initial sync of replica set '{}'", this.rsName);
                    z = true;
                } else if (lastOffsetTimestamp.compareTo(bsonTimestamp) < 0) {
                    this.logger.info("Initial sync is required since the oplog for replica set '{}' starts at {}, which is later than the timestamp of the last offset {}", new Object[]{this.rsName, bsonTimestamp, lastOffsetTimestamp});
                    z = true;
                } else {
                    this.logger.info("The oplog contains the last entry previously read for '{}', so no initial sync will be performed", this.rsName);
                }
            }
        } else {
            this.logger.info("No existing offset found for replica set '{}', starting initial sync", this.rsName);
            z = true;
        }
        return z;
    }

    protected boolean performInitialSync() {
        try {
            delaySnapshotIfNeeded();
            this.logger.info("Beginning initial sync of '{}' at {}", this.rsName, this.source.lastOffset(this.rsName));
            this.source.startInitialSync(this.replicaSet.replicaSetName());
            try {
                this.bufferedRecorder.startBuffering();
                long currentTimeInMillis = this.clock.currentTimeInMillis();
                List<CollectionId> collections = this.primaryClient.collections();
                ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue(collections);
                int min = Math.min(collections.size(), this.context.getConnectionContext().maxNumberOfCopyThreads());
                CountDownLatch countDownLatch = new CountDownLatch(min);
                AtomicBoolean atomicBoolean = new AtomicBoolean(false);
                AtomicInteger atomicInteger = new AtomicInteger(0);
                AtomicInteger atomicInteger2 = new AtomicInteger();
                AtomicLong atomicLong = new AtomicLong();
                this.logger.info("Preparing to use {} thread(s) to sync {} collection(s): {}", new Object[]{Integer.valueOf(min), Integer.valueOf(collections.size()), Strings.join(", ", collections)});
                for (int i = 0; i != min; i++) {
                    this.copyThreads.submit(() -> {
                        CollectionId collectionId;
                        this.context.configureLoggingContext(this.replicaSet.replicaSetName() + "-sync" + atomicInteger.incrementAndGet());
                        while (!atomicBoolean.get() && (collectionId = (CollectionId) concurrentLinkedQueue.poll()) != null) {
                            try {
                                try {
                                    long currentTimeInMillis2 = this.clock.currentTimeInMillis();
                                    this.logger.info("Starting initial sync of '{}'", collectionId);
                                    long copyCollection = copyCollection(collectionId, currentTimeInMillis);
                                    atomicInteger2.incrementAndGet();
                                    atomicLong.addAndGet(copyCollection);
                                    this.logger.info("Completing initial sync of {} documents from '{}' in {}", new Object[]{Long.valueOf(copyCollection), collectionId, Strings.duration(this.clock.currentTimeInMillis() - currentTimeInMillis2)});
                                } catch (InterruptedException e) {
                                    atomicBoolean.set(true);
                                    countDownLatch.countDown();
                                    return;
                                }
                            } finally {
                                countDownLatch.countDown();
                            }
                        }
                    });
                }
                try {
                    countDownLatch.await();
                } catch (InterruptedException e) {
                    Thread.interrupted();
                    atomicBoolean.set(true);
                }
                this.copyThreads.shutdown();
                long currentTimeInMillis2 = this.clock.currentTimeInMillis() - currentTimeInMillis;
                if (atomicBoolean.get()) {
                    this.logger.info("Initial sync aborted after {} with {} of {} collections incomplete", new Object[]{Strings.duration(currentTimeInMillis2), Integer.valueOf(collections.size() - atomicInteger2.get()), Integer.valueOf(collections.size())});
                    return false;
                }
                this.source.stopInitialSync(this.replicaSet.replicaSetName());
                try {
                    this.bufferedRecorder.stopBuffering(this.source.lastOffset(this.rsName));
                    this.logger.info("Initial sync of {} collections with a total of {} documents completed in {}", new Object[]{Integer.valueOf(collections.size()), Long.valueOf(atomicLong.get()), Strings.duration(currentTimeInMillis2)});
                    return true;
                } catch (InterruptedException e2) {
                    this.logger.info("Interrupted while waiting for last initial sync record from replica set '{}' to be recorded", this.rsName);
                    return false;
                }
            } catch (InterruptedException e3) {
                this.logger.info("Interrupted while waiting to flush the buffer before starting an initial sync of '{}'", this.rsName);
                return false;
            }
        } catch (InterruptedException e4) {
            this.logger.info("Interrupted while awaiting initial snapshot delay");
            return false;
        }
    }

    private void delaySnapshotIfNeeded() throws InterruptedException {
        Duration ofMillis = Duration.ofMillis(this.context.getConnectionContext().config.getLong(CommonConnectorConfig.SNAPSHOT_DELAY_MS));
        if (ofMillis.isZero() || ofMillis.isNegative()) {
            return;
        }
        Threads.Timer timer = Threads.timer(Clock.SYSTEM, ofMillis);
        Metronome parker = Metronome.parker(ConfigurationDefaults.RETURN_CONTROL_INTERVAL, Clock.SYSTEM);
        while (!timer.expired()) {
            if (!this.running.get()) {
                throw new InterruptedException("Interrupted while awaiting initial snapshot delay");
            }
            this.logger.info("The connector will wait for {}s before proceeding", Long.valueOf(timer.remaining().getSeconds()));
            parker.pause();
        }
    }

    protected long copyCollection(CollectionId collectionId, long j) throws InterruptedException {
        AtomicLong atomicLong = new AtomicLong();
        this.primaryClient.executeBlocking("sync '" + collectionId + "'", mongoClient -> {
            atomicLong.set(copyCollection(mongoClient, collectionId, j));
        });
        return atomicLong.get();
    }

    protected long copyCollection(MongoClient mongoClient, CollectionId collectionId, long j) throws InterruptedException {
        RecordMakers.RecordsForCollection forCollection = this.recordMakers.forCollection(collectionId);
        long j2 = 0;
        MongoCursor it = mongoClient.getDatabase(collectionId.dbName()).getCollection(collectionId.name()).find().iterator();
        Throwable th = null;
        while (this.running.get() && it.hasNext()) {
            try {
                try {
                    this.logger.trace("Found existing doc in {}: {}", collectionId, (Document) it.next());
                    j2 += forCollection.recordObject(collectionId, r0, j);
                } finally {
                }
            } catch (Throwable th2) {
                if (it != null) {
                    if (th != null) {
                        try {
                            it.close();
                        } catch (Throwable th3) {
                            th.addSuppressed(th3);
                        }
                    } else {
                        it.close();
                    }
                }
                throw th2;
            }
        }
        if (it != null) {
            if (0 != 0) {
                try {
                    it.close();
                } catch (Throwable th4) {
                    th.addSuppressed(th4);
                }
            } else {
                it.close();
            }
        }
        return j2;
    }

    protected void readOplog() {
        this.primaryClient.execute("read from oplog on '" + this.replicaSet + "'", this::readOplog);
    }

    protected void readOplog(MongoClient mongoClient) {
        BsonTimestamp lastOffsetTimestamp = this.source.lastOffsetTimestamp(this.replicaSet.replicaSetName());
        this.logger.info("Reading oplog for '{}' primary {} starting at {}", new Object[]{this.replicaSet, mongoClient.getAddress(), lastOffsetTimestamp});
        FindIterable cursorType = mongoClient.getDatabase("local").getCollection("oplog.rs").find(com.mongodb.client.model.Filters.and(new Bson[]{com.mongodb.client.model.Filters.gt("ts", lastOffsetTimestamp), com.mongodb.client.model.Filters.exists("fromMigrate", false)})).sort(new Document("$natural", 1)).oplogReplay(true).cursorType(CursorType.TailableAwait);
        ServerAddress address = mongoClient.getAddress();
        MongoCursor it = cursorType.iterator();
        Throwable th = null;
        do {
            try {
                try {
                    if (!this.running.get() || !it.hasNext()) {
                        if (it != null) {
                            if (0 == 0) {
                                it.close();
                                return;
                            }
                            try {
                                it.close();
                                return;
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                                return;
                            }
                        }
                        return;
                    }
                } catch (Throwable th3) {
                    th = th3;
                    throw th3;
                }
            } catch (Throwable th4) {
                if (it != null) {
                    if (th != null) {
                        try {
                            it.close();
                        } catch (Throwable th5) {
                            th.addSuppressed(th5);
                        }
                    } else {
                        it.close();
                    }
                }
                throw th4;
            }
        } while (handleOplogEvent(address, (Document) it.next()));
        if (it != null) {
            if (0 == 0) {
                it.close();
                return;
            }
            try {
                it.close();
            } catch (Throwable th6) {
                th.addSuppressed(th6);
            }
        }
    }

    protected boolean handleOplogEvent(ServerAddress serverAddress, Document document) {
        this.logger.debug("Found event: {}", document);
        String string = document.getString(SourceInfo.NAMESPACE);
        Document document2 = (Document) document.get("o", Document.class);
        if (document2 == null) {
            this.logger.warn("Missing 'o' field in event, so skipping {}", document.toJson());
            return true;
        }
        if (string == null || string.isEmpty()) {
            if ("new primary".equals(document2.getString("msg"))) {
                AtomicReference atomicReference = new AtomicReference();
                try {
                    this.primaryClient.executeBlocking("conn", mongoClient -> {
                        atomicReference.set(mongoClient.getAddress());
                    });
                } catch (InterruptedException e) {
                    this.logger.error("Get current primary executeBlocking", e);
                }
                ServerAddress serverAddress2 = (ServerAddress) atomicReference.get();
                if (serverAddress2 == null || serverAddress2.equals(serverAddress)) {
                    this.logger.info("Found new primary event in oplog, current {} is new primary. Continue to process oplog event.", serverAddress);
                } else {
                    this.logger.info("Found new primary event in oplog, so stopping use of {} to continue with new primary {}", serverAddress, serverAddress2);
                }
            }
            this.logger.debug("Skipping event with no namespace: {}", document.toJson());
            return true;
        }
        int indexOf = string.indexOf(46);
        if (indexOf <= 0) {
            return true;
        }
        if (!$assertionsDisabled && indexOf + 1 >= string.length()) {
            throw new AssertionError();
        }
        String substring = string.substring(0, indexOf);
        String substring2 = string.substring(indexOf + 1);
        if ("$cmd".equals(substring2)) {
            this.logger.debug("Skipping database command event: {}", document.toJson());
            return true;
        }
        if (!this.context.filters().databaseFilter().test(substring)) {
            this.logger.debug("Skipping the event for database {} based on database.whitelist");
            return true;
        }
        CollectionId collectionId = new CollectionId(this.rsName, substring, substring2);
        if (!this.context.filters().collectionFilter().test(collectionId)) {
            return true;
        }
        try {
            this.recordMakers.forCollection(collectionId).recordEvent(document, this.clock.currentTimeInMillis());
            return true;
        } catch (InterruptedException e2) {
            Thread.interrupted();
            return false;
        }
    }

    static {
        $assertionsDisabled = !Replicator.class.desiredAssertionStatus();
    }
}
