package io.debezium.connector.mongodb.transforms;

import io.debezium.config.Configuration;
import io.debezium.connector.mongodb.ConnectionContext;
import io.debezium.connector.mongodb.MongoDbConnector;
import io.debezium.connector.mongodb.MongoDbConnectorConfig;
import io.debezium.connector.mongodb.MongoDbTaskContext;
import io.debezium.connector.mongodb.ReplicaSet;
import io.debezium.connector.mongodb.TestHelper;
import io.debezium.embedded.AbstractConnectorTest;
import io.debezium.util.Testing;
import java.io.IOException;
import java.util.Collections;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.bson.Document;
import org.bson.RawBsonDocument;
import org.fest.assertions.Assertions;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:io/debezium/connector/mongodb/transforms/UnwrapFromMongoDbEnvelopeTestIT.class */
public class UnwrapFromMongoDbEnvelopeTestIT extends AbstractConnectorTest {
    private static final String DB_NAME = "transform";
    private static final String COLLECTION_NAME = "source";
    private static final String TOPIC_NAME = "mongo.transform.source";
    private Configuration config;
    private MongoDbTaskContext context;
    private UnwrapFromMongoDbEnvelope<SourceRecord> transformation;

    @Before
    public void beforeEach() {
        Testing.Debug.disable();
        Testing.Print.disable();
        stopConnector();
        initializeConnectorTestFramework();
        this.transformation = new UnwrapFromMongoDbEnvelope<>();
        this.transformation.configure(Collections.emptyMap());
    }

    @After
    public void afterEach() {
        try {
            stopConnector();
            this.transformation.close();
        } finally {
            if (this.context != null) {
                this.context.getConnectionContext().shutdown();
            }
        }
    }

    @Test
    public void shouldTransformEvents() throws InterruptedException, IOException {
        this.config = TestHelper.getConfiguration().edit().with(MongoDbConnectorConfig.POLL_INTERVAL_MS, 10).with(MongoDbConnectorConfig.COLLECTION_WHITELIST, "transform.*").with(MongoDbConnectorConfig.LOGICAL_NAME, "mongo").build();
        this.context = new MongoDbTaskContext(this.config);
        TestHelper.cleanDatabase(primary(), DB_NAME);
        start(MongoDbConnector.class, this.config);
        primary().execute("insert", mongoClient -> {
            mongoClient.getDatabase(DB_NAME).getCollection(COLLECTION_NAME).insertOne(Document.parse("{'_id': 1, 'dataStr': 'hello', 'dataInt': 123, 'dataLong': 80000000000}"));
        });
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(1);
        Assertions.assertThat(consumeRecordsByTopic.recordsForTopic(TOPIC_NAME).size()).isEqualTo(1);
        SourceRecord apply = this.transformation.apply((SourceRecord) consumeRecordsByTopic.recordsForTopic(TOPIC_NAME).get(0));
        Struct struct = (Struct) apply.value();
        Assertions.assertThat(apply.valueSchema().field("id").schema()).isEqualTo(Schema.OPTIONAL_INT32_SCHEMA);
        Assertions.assertThat(apply.valueSchema().field("dataStr").schema()).isEqualTo(Schema.OPTIONAL_STRING_SCHEMA);
        Assertions.assertThat(apply.valueSchema().field("dataInt").schema()).isEqualTo(Schema.OPTIONAL_INT32_SCHEMA);
        Assertions.assertThat(apply.valueSchema().field("dataLong").schema()).isEqualTo(Schema.OPTIONAL_INT64_SCHEMA);
        Assertions.assertThat(struct.get("id")).isEqualTo(1);
        Assertions.assertThat(struct.get("dataStr")).isEqualTo("hello");
        Assertions.assertThat(struct.get("dataInt")).isEqualTo(123);
        Assertions.assertThat(struct.get("dataLong")).isEqualTo(80000000000L);
        primary().execute("update", mongoClient2 -> {
            mongoClient2.getDatabase(DB_NAME).getCollection(COLLECTION_NAME).updateOne(RawBsonDocument.parse("{'_id' : 1}"), RawBsonDocument.parse("{'$set': {'dataStr': 'bye'}}"));
        });
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic2 = consumeRecordsByTopic(1);
        if (((Struct) ((SourceRecord) consumeRecordsByTopic2.recordsForTopic(TOPIC_NAME).get(0)).value()).get("op").equals("c")) {
            consumeRecordsByTopic2 = consumeRecordsByTopic(1);
        }
        Assertions.assertThat(consumeRecordsByTopic2.recordsForTopic(TOPIC_NAME).size()).isEqualTo(1);
        SourceRecord apply2 = this.transformation.apply((SourceRecord) consumeRecordsByTopic2.recordsForTopic(TOPIC_NAME).get(0));
        Struct struct2 = (Struct) apply2.value();
        Assertions.assertThat(apply2.valueSchema().field("id").schema()).isEqualTo(Schema.OPTIONAL_INT32_SCHEMA);
        Assertions.assertThat(apply2.valueSchema().field("dataStr").schema()).isEqualTo(Schema.OPTIONAL_STRING_SCHEMA);
        Assertions.assertThat(struct2.get("id")).isEqualTo(1);
        Assertions.assertThat(struct2.get("dataStr")).isEqualTo("bye");
        primary().execute("delete", mongoClient3 -> {
            mongoClient3.getDatabase(DB_NAME).getCollection(COLLECTION_NAME).deleteOne(RawBsonDocument.parse("{'_id' : 1}"));
        });
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic3 = consumeRecordsByTopic(2);
        Assertions.assertThat(consumeRecordsByTopic3.recordsForTopic(TOPIC_NAME).size()).isEqualTo(2);
        Assertions.assertThat((Struct) this.transformation.apply((SourceRecord) consumeRecordsByTopic3.recordsForTopic(TOPIC_NAME).get(0)).value()).isNull();
        Assertions.assertThat(((SourceRecord) consumeRecordsByTopic3.recordsForTopic(TOPIC_NAME).get(1)).value()).isNull();
    }

    private ConnectionContext.MongoPrimary primary() {
        return this.context.getConnectionContext().primaryFor(ReplicaSet.parse(this.context.getConnectionContext().hosts()), this.context.filters(), connectionErrorHandler(3));
    }

    private BiConsumer<String, Throwable> connectionErrorHandler(int i) {
        AtomicInteger atomicInteger = new AtomicInteger();
        return (str, th) -> {
            if (atomicInteger.incrementAndGet() > i) {
                Assert.fail("Unable to connect to primary after " + i + " errors trying to " + str + ": " + th);
            }
            this.logger.error("Error while attempting to {}: {}", new Object[]{str, th.getMessage(), th});
        };
    }
}
