package com.mongodb.kafka.connect.source;

import com.mongodb.MongoNamespace;
import com.mongodb.client.MongoClient;
import com.mongodb.kafka.connect.source.MongoSourceConfig;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Predicate;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.kafka.connect.errors.ConnectException;
import org.bson.BsonBinaryReader;
import org.bson.BsonDocument;
import org.bson.BsonType;
import org.bson.Document;
import org.bson.RawBsonDocument;
import org.bson.conversions.Bson;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:com/mongodb/kafka/connect/source/MongoCopyDataManager.class */
public class MongoCopyDataManager implements AutoCloseable {
    private volatile boolean closed;
    private volatile Exception errorException;
    private final AtomicInteger namespacesToCopy;
    private final MongoSourceConfig sourceConfig;
    private final MongoClient mongoClient;
    private final ExecutorService executor;
    private final ArrayBlockingQueue<BsonDocument> queue;
    private static final Logger LOGGER = LoggerFactory.getLogger((Class<?>) MongoCopyDataManager.class);
    private static final String NAMESPACE_FIELD = "ns";
    private static final byte[] NAMESPACE_BYTES = NAMESPACE_FIELD.getBytes(StandardCharsets.UTF_8);
    private static final String PIPELINE_TEMPLATE = String.format("{$replaceRoot: {newRoot: {_id: {_id: '$_id', copyingData: true}, operationType: 'insert', %s: {db: '%%s', coll: '%%s'}documentKey: {_id: '$_id'}, fullDocument: '$$ROOT'}}}", NAMESPACE_FIELD);
    static final String ALT_NAMESPACE_FIELD = "__";
    private static final BsonDocument ADD_ALT_NAMESPACE_STAGE = BsonDocument.parse(String.format("{'$addFields': {'%s': '$%s'}}", ALT_NAMESPACE_FIELD, NAMESPACE_FIELD));
    private static final BsonDocument UNSET_ORIGINAL_NAMESPACE_STAGE = BsonDocument.parse(String.format("{'$project': {'%s': 0}}", NAMESPACE_FIELD));

    /* JADX INFO: Access modifiers changed from: package-private */
    public MongoCopyDataManager(MongoSourceConfig mongoSourceConfig, MongoClient mongoClient) {
        this.sourceConfig = mongoSourceConfig;
        this.mongoClient = mongoClient;
        List<MongoNamespace> selectNamespaces = selectNamespaces(mongoSourceConfig, mongoClient);
        LOGGER.info("Copying existing data on the following namespaces: {}", selectNamespaces);
        this.namespacesToCopy = new AtomicInteger(selectNamespaces.size());
        MongoSourceConfig.StartupConfig.CopyExistingConfig copyExistingConfig = mongoSourceConfig.getStartupConfig().copyExistingConfig();
        this.queue = new ArrayBlockingQueue<>(copyExistingConfig.queueSize());
        this.executor = Executors.newFixedThreadPool(Math.max(1, Math.min(selectNamespaces.size(), copyExistingConfig.maxThreads())));
        selectNamespaces.forEach(mongoNamespace -> {
            this.executor.submit(() -> {
                copyDataFrom(mongoNamespace);
            });
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Optional<BsonDocument> poll() {
        if (this.errorException != null) {
            if (!this.closed) {
                close();
            }
            throw new ConnectException(this.errorException);
        }
        if (this.namespacesToCopy.get() == 0) {
            close();
        }
        return Optional.ofNullable(this.queue.poll());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean isCopying() {
        return this.namespacesToCopy.get() > 0 || !this.queue.isEmpty();
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        if (this.closed) {
            return;
        }
        this.closed = true;
        LOGGER.debug("Shutting down copy data manager executors");
        this.executor.shutdownNow();
    }

    private void copyDataFrom(MongoNamespace mongoNamespace) {
        LOGGER.debug("Copying existing data from: {}", mongoNamespace.getFullName());
        try {
            this.mongoClient.getDatabase(mongoNamespace.getDatabaseName()).getCollection(mongoNamespace.getCollectionName(), RawBsonDocument.class).aggregate(createPipeline(this.sourceConfig, mongoNamespace)).allowDiskUse(Boolean.valueOf(this.sourceConfig.getStartupConfig().copyExistingConfig().allowDiskUse())).forEach(this::putToQueue);
            this.namespacesToCopy.decrementAndGet();
        } catch (Exception e) {
            this.errorException = e;
        }
    }

    private void putToQueue(RawBsonDocument rawBsonDocument) {
        try {
            this.queue.put(convertDocument(rawBsonDocument));
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    static List<MongoNamespace> selectNamespaces(MongoSourceConfig mongoSourceConfig, MongoClient mongoClient) {
        String string = mongoSourceConfig.getString("database");
        String string2 = mongoSourceConfig.getString("collection");
        String namespaceRegex = mongoSourceConfig.getStartupConfig().copyExistingConfig().namespaceRegex();
        List<MongoNamespace> collections = string.isEmpty() ? getCollections(mongoClient) : string2.isEmpty() ? getCollections(mongoClient, string) : Collections.singletonList(createNamespace(string, string2));
        if (!namespaceRegex.isEmpty()) {
            Predicate<String> asPredicate = Pattern.compile(namespaceRegex).asPredicate();
            collections = (List) collections.stream().filter(mongoNamespace -> {
                return asPredicate.test(mongoNamespace.getFullName());
            }).collect(Collectors.toList());
        }
        return collections;
    }

    static List<Bson> createPipeline(MongoSourceConfig mongoSourceConfig, MongoNamespace mongoNamespace) {
        ArrayList arrayList = new ArrayList();
        Optional<List<Document>> pipeline = mongoSourceConfig.getStartupConfig().copyExistingConfig().pipeline();
        Objects.requireNonNull(arrayList);
        pipeline.map((v1) -> {
            return r1.addAll(v1);
        });
        arrayList.add(BsonDocument.parse(String.format(PIPELINE_TEMPLATE, mongoNamespace.getDatabaseName(), mongoNamespace.getCollectionName())));
        Optional<List<Document>> pipeline2 = mongoSourceConfig.getPipeline();
        Objects.requireNonNull(arrayList);
        pipeline2.map((v1) -> {
            return r1.addAll(v1);
        });
        arrayList.add(ADD_ALT_NAMESPACE_STAGE);
        arrayList.add(UNSET_ORIGINAL_NAMESPACE_STAGE);
        return arrayList;
    }

    static RawBsonDocument convertDocument(RawBsonDocument rawBsonDocument) {
        ByteBuffer asNIO = rawBsonDocument.getByteBuffer().asNIO();
        BsonBinaryReader bsonBinaryReader = new BsonBinaryReader(asNIO);
        int i = 0;
        bsonBinaryReader.readStartDocument();
        while (bsonBinaryReader.readBsonType() != BsonType.END_OF_DOCUMENT) {
            if (bsonBinaryReader.readName().equals(ALT_NAMESPACE_FIELD)) {
                int i2 = i + 1;
                byte[] array = asNIO.array();
                for (byte b : NAMESPACE_BYTES) {
                    int i3 = i2;
                    i2++;
                    array[i3] = b;
                }
                return rawBsonDocument;
            }
            bsonBinaryReader.skipValue();
            i = bsonBinaryReader.getBsonInput().getPosition();
        }
        return rawBsonDocument;
    }

    private static List<MongoNamespace> getCollections(MongoClient mongoClient) {
        return (List) ((ArrayList) mongoClient.listDatabaseNames().into(new ArrayList())).stream().filter(str -> {
            return (str.startsWith("admin") || str.startsWith("config") || str.startsWith("local")) ? false : true;
        }).map(str2 -> {
            return getCollections(mongoClient, str2);
        }).flatMap((v0) -> {
            return v0.stream();
        }).collect(Collectors.toList());
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static List<MongoNamespace> getCollections(MongoClient mongoClient, String str) {
        return (List) ((ArrayList) mongoClient.getDatabase(str).listCollectionNames().into(new ArrayList())).stream().filter(str2 -> {
            return !str2.startsWith("system.");
        }).map(str3 -> {
            return createNamespace(str, str3);
        }).collect(Collectors.toList());
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static MongoNamespace createNamespace(String str, String str2) {
        return new MongoNamespace(str, str2);
    }
}
