package io.debezium.connector.mongodb;

import com.mongodb.client.MongoCollection;
import com.mongodb.client.model.InsertOneOptions;
import io.debezium.config.Configuration;
import io.debezium.config.Field;
import io.debezium.connector.mongodb.ConnectionContext;
import io.debezium.data.Envelope;
import io.debezium.embedded.AbstractConnectorTest;
import io.debezium.util.IoUtil;
import io.debezium.util.Testing;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import org.apache.kafka.common.config.Config;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.bson.Document;
import org.fest.assertions.Assertions;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:io/debezium/connector/mongodb/MongoDbConnectorIT.class */
public class MongoDbConnectorIT extends AbstractConnectorTest {
    private Configuration config;
    private ReplicationContext context;

    @Before
    public void beforeEach() {
        Testing.Debug.disable();
        Testing.Print.disable();
        stopConnector();
        initializeConnectorTestFramework();
    }

    @After
    public void afterEach() {
        try {
            stopConnector();
        } finally {
            if (this.context != null) {
                this.context.shutdown();
            }
        }
    }

    @Test
    public void shouldNotStartWithInvalidConfiguration() {
        this.config = Configuration.create().with(MongoDbConnectorConfig.AUTO_DISCOVER_MEMBERS, "true").build();
        this.logger.info("Attempting to start the connector with an INVALID configuration, so MULTIPLE error messages & one exceptions will appear in the log");
        start(MongoDbConnector.class, this.config, (z, str, th) -> {
            Assertions.assertThat(z).isFalse();
            Assertions.assertThat(th).isNotNull();
        });
        assertConnectorNotRunning();
    }

    @Test
    public void shouldFailToValidateInvalidConfiguration() {
        Config validate = new MongoDbConnector().validate(Configuration.create().build().asMap());
        assertConfigurationErrors(validate, MongoDbConnectorConfig.HOSTS, 1);
        assertConfigurationErrors(validate, MongoDbConnectorConfig.LOGICAL_NAME, 1);
        assertNoConfigurationErrors(validate, new Field[]{MongoDbConnectorConfig.USER});
        assertNoConfigurationErrors(validate, new Field[]{MongoDbConnectorConfig.PASSWORD});
        assertNoConfigurationErrors(validate, new Field[]{MongoDbConnectorConfig.AUTO_DISCOVER_MEMBERS});
        assertNoConfigurationErrors(validate, new Field[]{MongoDbConnectorConfig.DATABASE_LIST});
        assertNoConfigurationErrors(validate, new Field[]{MongoDbConnectorConfig.COLLECTION_WHITELIST});
        assertNoConfigurationErrors(validate, new Field[]{MongoDbConnectorConfig.COLLECTION_BLACKLIST});
        assertNoConfigurationErrors(validate, new Field[]{MongoDbConnectorConfig.MAX_COPY_THREADS});
        assertNoConfigurationErrors(validate, new Field[]{MongoDbConnectorConfig.MAX_QUEUE_SIZE});
        assertNoConfigurationErrors(validate, new Field[]{MongoDbConnectorConfig.MAX_BATCH_SIZE});
        assertNoConfigurationErrors(validate, new Field[]{MongoDbConnectorConfig.POLL_INTERVAL_MS});
        assertNoConfigurationErrors(validate, new Field[]{MongoDbConnectorConfig.CONNECT_BACKOFF_INITIAL_DELAY_MS});
        assertNoConfigurationErrors(validate, new Field[]{MongoDbConnectorConfig.CONNECT_BACKOFF_MAX_DELAY_MS});
        assertNoConfigurationErrors(validate, new Field[]{MongoDbConnectorConfig.MAX_FAILED_CONNECTIONS});
    }

    @Test
    public void shouldValidateAcceptableConfiguration() {
        this.config = Configuration.create().with(MongoDbConnectorConfig.HOSTS, System.getProperty("connector.mongodb.hosts")).with(MongoDbConnectorConfig.AUTO_DISCOVER_MEMBERS, System.getProperty("connector.mongodb.members.auto.discover")).with(MongoDbConnectorConfig.LOGICAL_NAME, System.getProperty("connector.mongodb.name")).build();
        this.context = new ReplicationContext(this.config);
        storeDocuments("dbval", "validationColl1", "simple_objects.json");
        storeDocuments("dbval2", "validationColl2", "restaurants1.json");
        Config validate = new MongoDbConnector().validate(this.config.asMap());
        assertNoConfigurationErrors(validate, new Field[]{MongoDbConnectorConfig.HOSTS});
        assertNoConfigurationErrors(validate, new Field[]{MongoDbConnectorConfig.LOGICAL_NAME});
        assertNoConfigurationErrors(validate, new Field[]{MongoDbConnectorConfig.USER});
        assertNoConfigurationErrors(validate, new Field[]{MongoDbConnectorConfig.PASSWORD});
        assertNoConfigurationErrors(validate, new Field[]{MongoDbConnectorConfig.AUTO_DISCOVER_MEMBERS});
        assertNoConfigurationErrors(validate, new Field[]{MongoDbConnectorConfig.DATABASE_LIST});
        assertNoConfigurationErrors(validate, new Field[]{MongoDbConnectorConfig.COLLECTION_WHITELIST});
        assertNoConfigurationErrors(validate, new Field[]{MongoDbConnectorConfig.COLLECTION_BLACKLIST});
        assertNoConfigurationErrors(validate, new Field[]{MongoDbConnectorConfig.MAX_COPY_THREADS});
        assertNoConfigurationErrors(validate, new Field[]{MongoDbConnectorConfig.MAX_QUEUE_SIZE});
        assertNoConfigurationErrors(validate, new Field[]{MongoDbConnectorConfig.MAX_BATCH_SIZE});
        assertNoConfigurationErrors(validate, new Field[]{MongoDbConnectorConfig.POLL_INTERVAL_MS});
        assertNoConfigurationErrors(validate, new Field[]{MongoDbConnectorConfig.CONNECT_BACKOFF_INITIAL_DELAY_MS});
        assertNoConfigurationErrors(validate, new Field[]{MongoDbConnectorConfig.CONNECT_BACKOFF_MAX_DELAY_MS});
        assertNoConfigurationErrors(validate, new Field[]{MongoDbConnectorConfig.MAX_FAILED_CONNECTIONS});
        List validValues = MongoDbConnectorConfig.DATABASE_LIST.recommender().validValues(MongoDbConnectorConfig.DATABASE_LIST, this.config);
        Testing.debug("List of dbNames: " + validValues);
        Assertions.assertThat(validValues).contains(new Object[]{"dbval", "dbval2"});
        Field.Recommender recommender = MongoDbConnectorConfig.COLLECTION_WHITELIST.recommender();
        List validValues2 = recommender.validValues(MongoDbConnectorConfig.COLLECTION_WHITELIST, this.config);
        Testing.debug("List of collection names: " + validValues2);
        Assertions.assertThat(validValues2).isEmpty();
        List validValues3 = recommender.validValues(MongoDbConnectorConfig.COLLECTION_WHITELIST, this.config.edit().with(MongoDbConnectorConfig.DATABASE_LIST, "dbval").build());
        Assertions.assertThat(validValues3).containsOnly(new Object[]{"dbval.validationColl1"});
        Testing.debug("List of collection names: " + validValues3);
    }

    @Test
    public void shouldConsumeAllEventsFromDatabase() throws InterruptedException, IOException {
        this.config = Configuration.create().with(MongoDbConnectorConfig.HOSTS, System.getProperty("connector.mongodb.hosts")).with(MongoDbConnectorConfig.AUTO_DISCOVER_MEMBERS, System.getProperty("connector.mongodb.members.auto.discover")).with(MongoDbConnectorConfig.LOGICAL_NAME, System.getProperty("connector.mongodb.name")).with(MongoDbConnectorConfig.POLL_INTERVAL_MS, 10).with(MongoDbConnectorConfig.COLLECTION_WHITELIST, "dbit.*").with(MongoDbConnectorConfig.LOGICAL_NAME, "mongo").build();
        this.context = new ReplicationContext(this.config);
        storeDocuments("dbit", "simpletons", "simple_objects.json");
        storeDocuments("dbit", "restaurants", "restaurants1.json");
        start(MongoDbConnector.class, this.config);
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(12);
        Assertions.assertThat(consumeRecordsByTopic.recordsForTopic("mongo.dbit.simpletons").size()).isEqualTo(6);
        Assertions.assertThat(consumeRecordsByTopic.recordsForTopic("mongo.dbit.restaurants").size()).isEqualTo(6);
        Assertions.assertThat(consumeRecordsByTopic.topics().size()).isEqualTo(2);
        consumeRecordsByTopic.forEach(sourceRecord -> {
            validate(sourceRecord);
            verifyFromInitialSync(sourceRecord);
            verifyReadOperation(sourceRecord);
        });
        storeDocuments("dbit", "restaurants", "restaurants2.json");
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic2 = consumeRecordsByTopic(4);
        Assertions.assertThat(consumeRecordsByTopic2.recordsForTopic("mongo.dbit.restaurants").size()).isEqualTo(4);
        Assertions.assertThat(consumeRecordsByTopic2.topics().size()).isEqualTo(1);
        consumeRecordsByTopic2.forEach(sourceRecord2 -> {
            validate(sourceRecord2);
            verifyNotFromInitialSync(sourceRecord2);
            verifyCreateOperation(sourceRecord2);
        });
        stopConnector();
        storeDocuments("dbit", "restaurants", "restaurants3.json");
        start(MongoDbConnector.class, this.config);
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic3 = consumeRecordsByTopic(5);
        Assertions.assertThat(consumeRecordsByTopic3.recordsForTopic("mongo.dbit.restaurants").size()).isEqualTo(5);
        Assertions.assertThat(consumeRecordsByTopic3.topics().size()).isEqualTo(1);
        consumeRecordsByTopic3.forEach(sourceRecord3 -> {
            validate(sourceRecord3);
            verifyNotFromInitialSync(sourceRecord3);
            verifyCreateOperation(sourceRecord3);
        });
        storeDocuments("dbit", "restaurants", "restaurants4.json");
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic4 = consumeRecordsByTopic(8);
        Assertions.assertThat(consumeRecordsByTopic4.recordsForTopic("mongo.dbit.restaurants").size()).isEqualTo(8);
        Assertions.assertThat(consumeRecordsByTopic4.topics().size()).isEqualTo(1);
        consumeRecordsByTopic4.forEach(sourceRecord4 -> {
            validate(sourceRecord4);
            verifyNotFromInitialSync(sourceRecord4);
            verifyCreateOperation(sourceRecord4);
        });
    }

    protected void verifyFromInitialSync(SourceRecord sourceRecord) {
        Assertions.assertThat(sourceRecord.sourceOffset().containsKey("initsync")).isTrue();
        Assertions.assertThat(((Struct) sourceRecord.value()).getStruct("source").getBoolean("initsync")).isTrue();
    }

    protected void verifyNotFromInitialSync(SourceRecord sourceRecord) {
        Assertions.assertThat(sourceRecord.sourceOffset().containsKey("initsync")).isFalse();
        Assertions.assertThat(((Struct) sourceRecord.value()).getStruct("source").getBoolean("initsync")).isNull();
    }

    protected void verifyCreateOperation(SourceRecord sourceRecord) {
        verifyOperation(sourceRecord, Envelope.Operation.CREATE);
    }

    protected void verifyReadOperation(SourceRecord sourceRecord) {
        verifyOperation(sourceRecord, Envelope.Operation.READ);
    }

    protected void verifyUpdateOperation(SourceRecord sourceRecord) {
        verifyOperation(sourceRecord, Envelope.Operation.UPDATE);
    }

    protected void verifyDeleteOperation(SourceRecord sourceRecord) {
        verifyOperation(sourceRecord, Envelope.Operation.DELETE);
    }

    protected void verifyOperation(SourceRecord sourceRecord, Envelope.Operation operation) {
        Assertions.assertThat(((Struct) sourceRecord.value()).getString("op")).isEqualTo(operation.code());
    }

    protected ConnectionContext.MongoPrimary primary() {
        return this.context.primaryFor(ReplicaSet.parse(this.context.hosts()), connectionErrorHandler(3));
    }

    protected void storeDocuments(String str, String str2, String str3) {
        primary().execute("storing documents", mongoClient -> {
            Testing.debug("Storing in '" + str + "." + str2 + "' documents loaded from from '" + str3 + "'");
            MongoCollection<Document> collection = mongoClient.getDatabase(str).getCollection(str2);
            collection.drop();
            storeDocuments(collection, str3);
        });
    }

    protected void storeDocuments(MongoCollection<Document> mongoCollection, String str) {
        InsertOneOptions bypassDocumentValidation = new InsertOneOptions().bypassDocumentValidation(true);
        loadTestDocuments(str).forEach(document -> {
            Assertions.assertThat(document).isNotNull();
            Assertions.assertThat(document.size()).isGreaterThan(0);
            mongoCollection.insertOne(document, bypassDocumentValidation);
        });
    }

    protected List<Document> loadTestDocuments(String str) {
        InputStream readResourceAsStream;
        Throwable th;
        ArrayList arrayList = new ArrayList();
        try {
            readResourceAsStream = Testing.Files.readResourceAsStream(str);
            th = null;
        } catch (IOException e) {
            Assert.fail("Unable to find or read file '" + str + "': " + e.getMessage());
        }
        try {
            try {
                Assertions.assertThat(readResourceAsStream).isNotNull();
                IoUtil.readLines(readResourceAsStream, str2 -> {
                    Document parse = Document.parse(str2);
                    Assertions.assertThat(parse.size()).isGreaterThan(0);
                    arrayList.add(parse);
                });
                if (readResourceAsStream != null) {
                    if (0 != 0) {
                        try {
                            readResourceAsStream.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        readResourceAsStream.close();
                    }
                }
                return arrayList;
            } finally {
            }
        } finally {
        }
    }

    protected BiConsumer<String, Throwable> connectionErrorHandler(int i) {
        AtomicInteger atomicInteger = new AtomicInteger();
        return (str, th) -> {
            if (atomicInteger.incrementAndGet() > i) {
                Assert.fail("Unable to connect to primary after " + i + " errors trying to " + str + ": " + th);
            }
            this.logger.error("Error while attempting to {}: {}", new Object[]{str, th.getMessage(), th});
        };
    }
}
