package io.debezium.relational.history;

import io.debezium.config.Configuration;
import io.debezium.connector.mysql.MySqlConnectorConfig;
import io.debezium.connector.mysql.MySqlOffsetContext;
import io.debezium.connector.mysql.MySqlPartition;
import io.debezium.connector.mysql.MySqlReadOnlyIncrementalSnapshotContext;
import io.debezium.connector.mysql.SourceInfo;
import io.debezium.connector.mysql.antlr.MySqlAntlrDdlParser;
import io.debezium.doc.FixFor;
import io.debezium.kafka.KafkaCluster;
import io.debezium.pipeline.spi.Offsets;
import io.debezium.pipeline.spi.Partition;
import io.debezium.pipeline.txmetadata.TransactionContext;
import io.debezium.relational.RelationalDatabaseConnectorConfig;
import io.debezium.relational.Tables;
import io.debezium.text.ParsingException;
import io.debezium.util.Collect;
import io.debezium.util.Testing;
import java.io.File;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import org.fest.assertions.Assertions;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;

/* loaded from: input_file:io/debezium/relational/history/KafkaDatabaseHistoryTest.class */
public class KafkaDatabaseHistoryTest {
    private static KafkaCluster kafka;
    private KafkaDatabaseHistory history;
    private Offsets<Partition, MySqlOffsetContext> offsets;
    private MySqlOffsetContext position;
    private static final int PARTITION_NO = 0;

    @BeforeClass
    public static void startKafka() throws Exception {
        File createTestingDirectory = Testing.Files.createTestingDirectory("history_cluster");
        Testing.Files.delete(createTestingDirectory);
        kafka = new KafkaCluster().usingDirectory(createTestingDirectory).deleteDataPriorToStartup(true).deleteDataUponShutdown(true).addBrokers(1).withKafkaConfiguration(Collect.propertiesOf("auto.create.topics.enable", "false", "zookeeper.session.timeout.ms", "20000")).startup();
    }

    @AfterClass
    public static void stopKafka() {
        if (kafka != null) {
            kafka.shutdown();
        }
    }

    @Before
    public void beforeEach() throws Exception {
        MySqlPartition mySqlPartition = new MySqlPartition("my-server");
        this.position = new MySqlOffsetContext(false, true, new TransactionContext(), new MySqlReadOnlyIncrementalSnapshotContext(), new SourceInfo(new MySqlConnectorConfig(Configuration.empty().edit().with(RelationalDatabaseConnectorConfig.SERVER_NAME, "dbserver1").build())));
        this.offsets = Offsets.of(mySqlPartition, this.position);
        setLogPosition(PARTITION_NO);
        this.history = new KafkaDatabaseHistory();
    }

    @After
    public void afterEach() {
        try {
            if (this.history != null) {
                this.history.stop();
            }
        } finally {
            this.history = null;
        }
    }

    @Test
    public void shouldStartWithEmptyTopicAndStoreDataAndRecoverAllState() throws Exception {
        kafka.createTopic("empty-and-recovery-schema-changes", 1, 1);
        testHistoryTopicContent("empty-and-recovery-schema-changes", false);
    }

    private void testHistoryTopicContent(String str, boolean z) {
        Configuration build = Configuration.create().with(KafkaDatabaseHistory.BOOTSTRAP_SERVERS, kafka.brokerList()).with(KafkaDatabaseHistory.TOPIC, str).with(DatabaseHistory.NAME, "my-db-history").with(KafkaDatabaseHistory.RECOVERY_POLL_INTERVAL_MS, 500).with(KafkaDatabaseHistory.consumerConfigPropertyName("max.poll.interval.ms"), 100).with(KafkaDatabaseHistory.consumerConfigPropertyName("session.timeout.ms"), 50000).with(KafkaDatabaseHistory.SKIP_UNPARSEABLE_DDL_STATEMENTS, z).with(KafkaDatabaseHistory.INTERNAL_CONNECTOR_CLASS, "org.apache.kafka.connect.source.SourceConnector").with(KafkaDatabaseHistory.INTERNAL_CONNECTOR_ID, "dbz-test").build();
        this.history.configure(build, (HistoryRecordComparator) null, DatabaseHistoryMetrics.NOOP, true);
        this.history.start();
        this.history.start();
        this.history.initializeStorage();
        this.history.initializeStorage();
        MySqlAntlrDdlParser mySqlAntlrDdlParser = new MySqlAntlrDdlParser();
        MySqlAntlrDdlParser mySqlAntlrDdlParser2 = new MySqlAntlrDdlParser();
        mySqlAntlrDdlParser2.setCurrentSchema("db1");
        Tables tables = new Tables();
        Tables tables2 = new Tables();
        Tables tables3 = new Tables();
        setLogPosition(PARTITION_NO);
        this.history.recover(this.offsets, tables, mySqlAntlrDdlParser);
        Assertions.assertThat(tables.size()).isEqualTo(PARTITION_NO);
        setLogPosition(10);
        this.history.record(this.offsets.getTheOnlyPartition().getSourcePartition(), this.offsets.getTheOnlyOffset().getOffset(), "db1", "CREATE TABLE foo ( name VARCHAR(255) NOT NULL PRIMARY KEY); \nCREATE TABLE customers ( id INTEGER NOT NULL PRIMARY KEY, name VARCHAR(100) NOT NULL ); \nCREATE TABLE products ( productId INTEGER NOT NULL PRIMARY KEY, description VARCHAR(255) NOT NULL); \n");
        mySqlAntlrDdlParser2.parse("CREATE TABLE foo ( name VARCHAR(255) NOT NULL PRIMARY KEY); \nCREATE TABLE customers ( id INTEGER NOT NULL PRIMARY KEY, name VARCHAR(100) NOT NULL ); \nCREATE TABLE products ( productId INTEGER NOT NULL PRIMARY KEY, description VARCHAR(255) NOT NULL); \n", tables);
        Assertions.assertThat(tables.size()).isEqualTo(3);
        mySqlAntlrDdlParser2.parse("CREATE TABLE foo ( name VARCHAR(255) NOT NULL PRIMARY KEY); \nCREATE TABLE customers ( id INTEGER NOT NULL PRIMARY KEY, name VARCHAR(100) NOT NULL ); \nCREATE TABLE products ( productId INTEGER NOT NULL PRIMARY KEY, description VARCHAR(255) NOT NULL); \n", tables2);
        Assertions.assertThat(tables2.size()).isEqualTo(3);
        mySqlAntlrDdlParser2.parse("CREATE TABLE foo ( name VARCHAR(255) NOT NULL PRIMARY KEY); \nCREATE TABLE customers ( id INTEGER NOT NULL PRIMARY KEY, name VARCHAR(100) NOT NULL ); \nCREATE TABLE products ( productId INTEGER NOT NULL PRIMARY KEY, description VARCHAR(255) NOT NULL); \n", tables3);
        Assertions.assertThat(tables3.size()).isEqualTo(3);
        setLogPosition(39);
        this.history.record(this.offsets.getTheOnlyPartition().getSourcePartition(), this.offsets.getTheOnlyOffset().getOffset(), "db1", "DROP TABLE foo;");
        mySqlAntlrDdlParser2.parse("DROP TABLE foo;", tables2);
        Assertions.assertThat(tables2.size()).isEqualTo(2);
        mySqlAntlrDdlParser2.parse("DROP TABLE foo;", tables3);
        Assertions.assertThat(tables3.size()).isEqualTo(2);
        setLogPosition(10003);
        this.history.record(this.offsets.getTheOnlyPartition().getSourcePartition(), this.offsets.getTheOnlyOffset().getOffset(), "db1", "CREATE TABLE suppliers ( supplierId INTEGER NOT NULL PRIMARY KEY, name VARCHAR(255) NOT NULL);");
        mySqlAntlrDdlParser2.parse("CREATE TABLE suppliers ( supplierId INTEGER NOT NULL PRIMARY KEY, name VARCHAR(255) NOT NULL);", tables3);
        Assertions.assertThat(tables3.size()).isEqualTo(3);
        this.history.stop();
        this.history = new KafkaDatabaseHistory();
        this.history.configure(build, (HistoryRecordComparator) null, DatabaseHistoryListener.NOOP, true);
        Tables tables4 = new Tables();
        setLogPosition(15);
        this.history.recover(this.offsets, tables4, mySqlAntlrDdlParser);
        Assertions.assertThat(tables4).isEqualTo(tables);
        Tables tables5 = new Tables();
        setLogPosition(50);
        this.history.recover(this.offsets, tables5, mySqlAntlrDdlParser);
        Assertions.assertThat(tables5).isEqualTo(tables2);
        Tables tables6 = new Tables();
        setLogPosition(10010);
        this.history.recover(this.offsets, tables6, mySqlAntlrDdlParser);
        Assertions.assertThat(tables6).isEqualTo(tables3);
        Tables tables7 = new Tables();
        setLogPosition(100000010);
        this.history.recover(this.offsets, tables7, mySqlAntlrDdlParser);
        Assertions.assertThat(tables7).isEqualTo(tables3);
    }

    protected void setLogPosition(int i) {
        this.position.setBinlogStartPoint("my-txn-file.log", i);
    }

    @Test
    public void shouldIgnoreUnparseableMessages() throws Exception {
        kafka.createTopic("ignore-unparseable-schema-changes", 1, 1);
        ProducerRecord producerRecord = new ProducerRecord("ignore-unparseable-schema-changes", Integer.valueOf(PARTITION_NO), (Object) null, (Object) null);
        ProducerRecord producerRecord2 = new ProducerRecord("ignore-unparseable-schema-changes", Integer.valueOf(PARTITION_NO), (Object) null, "");
        ProducerRecord producerRecord3 = new ProducerRecord("ignore-unparseable-schema-changes", Integer.valueOf(PARTITION_NO), (Object) null, "{\"position\":{\"filename\":\"my-txn-file.log\",\"position\":39},\"databaseName\":\"db1\",\"ddl\":\"DROP TABLE foo;\"}");
        ProducerRecord producerRecord4 = new ProducerRecord("ignore-unparseable-schema-changes", Integer.valueOf(PARTITION_NO), (Object) null, "{\"source\":{\"server\":\"my-server\"},\"databaseName\":\"db1\",\"ddl\":\"DROP TABLE foo;\"}");
        ProducerRecord producerRecord5 = new ProducerRecord("ignore-unparseable-schema-changes", Integer.valueOf(PARTITION_NO), (Object) null, "{\"source\":{\"server\":\"my-server\"},\"position\":{\"filename\":\"my-txn-file.log\",\"position\":39},\"databaseName\":\"db1\",\"ddl\":\"DROP TABLE foo;\"");
        ProducerRecord producerRecord6 = new ProducerRecord("ignore-unparseable-schema-changes", Integer.valueOf(PARTITION_NO), (Object) null, "\"source\":{\"server\":\"my-server\"},\"position\":{\"filename\":\"my-txn-file.log\",\"position\":39},\"databaseName\":\"db1\",\"ddl\":\"DROP TABLE foo;\"}");
        ProducerRecord producerRecord7 = new ProducerRecord("ignore-unparseable-schema-changes", Integer.valueOf(PARTITION_NO), (Object) null, "{\"source\":{\"server\":\"my-server\"},\"position\":{\"filename\":\"my-txn-file.log\",\"position\":39},\"databaseName\":\"db1\",\"ddl\":\"xxxDROP TABLE foo;\"}");
        ProducerRecord producerRecord8 = new ProducerRecord("ignore-unparseable-schema-changes", Integer.valueOf(PARTITION_NO), (Object) null, "{\"source\":{\"server\":\"my-server\"},\"position\":{\"filename\":\"my-txn-file.log\",\"position\":39},\"databaseName\":\"db1\",\"ddl\":\"CREATE DEFINER=`myUser`@`%` PROCEDURE `tableAFetchCount`(        in p_uniqueID int        )BEGINselect count(*) into @propCount from tableA  where uniqueID = p_uniqueID;    select count(*) into @completeCount from tableA  where uniqueID = p_uniqueID and isComplete = 1;       select  uniqueID,   @propCount as propCount, @completeCount as completeCount, @completeCount/ @propCount * 100 as completePct        where uniqueID = p_uniqueID;END\"}");
        KafkaProducer kafkaProducer = new KafkaProducer(Configuration.create().withDefault("bootstrap.servers", kafka.brokerList()).withDefault("client.id", "intruder").withDefault("key.serializer", StringSerializer.class).withDefault("value.serializer", StringSerializer.class).build().asProperties());
        try {
            kafkaProducer.send(producerRecord).get();
            kafkaProducer.send(producerRecord2).get();
            kafkaProducer.send(producerRecord3).get();
            kafkaProducer.send(producerRecord4).get();
            kafkaProducer.send(producerRecord5).get();
            kafkaProducer.send(producerRecord6).get();
            kafkaProducer.send(producerRecord7).get();
            kafkaProducer.send(producerRecord8).get();
            kafkaProducer.close();
            testHistoryTopicContent("ignore-unparseable-schema-changes", true);
        } catch (Throwable th) {
            try {
                kafkaProducer.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test(expected = ParsingException.class)
    public void shouldStopOnUnparseableSQL() throws Exception {
        kafka.createTopic("stop-on-unparseable-schema-changes", 1, 1);
        ProducerRecord producerRecord = new ProducerRecord("stop-on-unparseable-schema-changes", Integer.valueOf(PARTITION_NO), (Object) null, "{\"source\":{\"server\":\"my-server\"},\"position\":{\"filename\":\"my-txn-file.log\",\"position\":39},\"databaseName\":\"db1\",\"ddl\":\"xxxDROP TABLE foo;\"}");
        KafkaProducer kafkaProducer = new KafkaProducer(Configuration.create().withDefault("bootstrap.servers", kafka.brokerList()).withDefault("client.id", "intruder").withDefault("key.serializer", StringSerializer.class).withDefault("value.serializer", StringSerializer.class).build().asProperties());
        try {
            kafkaProducer.send(producerRecord).get();
            kafkaProducer.close();
            testHistoryTopicContent("stop-on-unparseable-schema-changes", false);
        } catch (Throwable th) {
            try {
                kafkaProducer.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    public void testExists() {
        testHistoryTopicContent("exists-schema-changes", true);
        Assert.assertTrue(this.history.exists());
        this.history.configure(Configuration.create().with(KafkaDatabaseHistory.BOOTSTRAP_SERVERS, kafka.brokerList()).with(KafkaDatabaseHistory.TOPIC, "dummytopic").with(DatabaseHistory.NAME, "my-db-history").with(KafkaDatabaseHistory.RECOVERY_POLL_INTERVAL_MS, 500).with(KafkaDatabaseHistory.consumerConfigPropertyName("max.poll.interval.ms"), 100).with(KafkaDatabaseHistory.consumerConfigPropertyName("session.timeout.ms"), 50000).with(KafkaDatabaseHistory.SKIP_UNPARSEABLE_DDL_STATEMENTS, true).with(KafkaDatabaseHistory.INTERNAL_CONNECTOR_CLASS, "org.apache.kafka.connect.source.SourceConnector").with(KafkaDatabaseHistory.INTERNAL_CONNECTOR_ID, "dbz-test").build(), (HistoryRecordComparator) null, DatabaseHistoryMetrics.NOOP, true);
        this.history.start();
        Assert.assertFalse(this.history.exists());
    }

    @Test
    @FixFor({"DBZ-1886"})
    public void differentiateStorageExistsFromHistoryExists() {
        this.history.configure(Configuration.create().with(KafkaDatabaseHistory.BOOTSTRAP_SERVERS, kafka.brokerList()).with(KafkaDatabaseHistory.TOPIC, "differentiate-storage-exists-schema-changes").with(DatabaseHistory.NAME, "my-db-history").with(KafkaDatabaseHistory.RECOVERY_POLL_INTERVAL_MS, 500).with(KafkaDatabaseHistory.consumerConfigPropertyName("max.poll.interval.ms"), 100).with(KafkaDatabaseHistory.consumerConfigPropertyName("session.timeout.ms"), 50000).build(), (HistoryRecordComparator) null, DatabaseHistoryMetrics.NOOP, true);
        Assert.assertFalse(this.history.storageExists());
        this.history.initializeStorage();
        Assert.assertTrue(this.history.storageExists());
        Assert.assertFalse(this.history.exists());
        this.history.start();
        setLogPosition(PARTITION_NO);
        this.history.record(this.offsets.getTheOnlyPartition().getSourcePartition(), this.offsets.getTheOnlyOffset().getOffset(), "db1", "CREATE TABLE foo ( name VARCHAR(255) NOT NULL PRIMARY KEY); \nCREATE TABLE customers ( id INTEGER NOT NULL PRIMARY KEY, name VARCHAR(100) NOT NULL ); \nCREATE TABLE products ( productId INTEGER NOT NULL PRIMARY KEY, desc VARCHAR(255) NOT NULL); \n");
        Assert.assertTrue(this.history.exists());
        Assert.assertTrue(this.history.storageExists());
    }

    @Test
    @FixFor({"DBZ-2144"})
    public void shouldValidateMandatoryValues() {
        Assertions.assertThat(Configuration.create().build().validate(KafkaDatabaseHistory.ALL_FIELDS).keySet()).isEqualTo(Collect.unmodifiableSet(new String[]{"database.history.name", "database.history.connector.class", "database.history.kafka.topic", "database.history.kafka.bootstrap.servers", "database.history.kafka.recovery.poll.interval.ms", "database.history.connector.id", "database.history.kafka.recovery.attempts"}));
    }
}
