package io.debezium.connector.oracle;

import io.debezium.config.Configuration;
import io.debezium.connector.oracle.OracleConnectorConfig;
import io.debezium.connector.oracle.junit.RequireDatabaseOption;
import io.debezium.connector.oracle.junit.SkipTestDependingOnAdapterNameRule;
import io.debezium.connector.oracle.junit.SkipTestDependingOnDatabaseOptionRule;
import io.debezium.connector.oracle.junit.SkipWhenAdapterNameIsNot;
import io.debezium.connector.oracle.util.TestHelper;
import io.debezium.data.SchemaAndValueField;
import io.debezium.data.VariableScaleDecimal;
import io.debezium.data.VerifyRecord;
import io.debezium.doc.FixFor;
import io.debezium.embedded.AbstractConnectorTest;
import io.debezium.heartbeat.Heartbeat;
import io.debezium.junit.logging.LogInterceptor;
import io.debezium.relational.RelationalDatabaseConnectorConfig;
import io.debezium.util.Testing;
import java.math.BigDecimal;
import java.sql.SQLException;
import java.time.Duration;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import junit.framework.TestCase;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.awaitility.Awaitility;
import org.awaitility.Durations;
import org.fest.assertions.Assertions;
import org.fest.assertions.MapAssert;
import org.fest.assertions.ObjectAssert;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestRule;

/* loaded from: input_file:io/debezium/connector/oracle/OracleConnectorIT.class */
public class OracleConnectorIT extends AbstractConnectorTest {
    private static final long MICROS_PER_SECOND = TimeUnit.SECONDS.toMicros(1);
    private static final String SNAPSHOT_COMPLETED_KEY = "snapshot_completed";

    @Rule
    public final TestRule skipAdapterRule = new SkipTestDependingOnAdapterNameRule();

    @Rule
    public final TestRule skipOptionRule = new SkipTestDependingOnDatabaseOptionRule();
    private static OracleConnection connection;

    @BeforeClass
    public static void beforeClass() throws SQLException {
        connection = TestHelper.testConnection();
        TestHelper.dropTable(connection, "debezium.customer");
        TestHelper.dropTable(connection, "debezium.masked_hashed_column_table");
        TestHelper.dropTable(connection, "debezium.truncated_column_table");
        TestHelper.dropTable(connection, "debezium.dt_table");
        connection.execute(new String[]{"create table debezium.customer (  id numeric(9,0) not null,   name varchar2(1000),   score decimal(6, 2),   registered timestamp,   primary key (id))"});
        connection.execute(new String[]{"GRANT SELECT ON debezium.customer to  " + TestHelper.getConnectorUserName()});
        connection.execute(new String[]{"ALTER TABLE debezium.customer ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS"});
        connection.execute(new String[]{"create table debezium.masked_hashed_column_table (  id numeric(9,0) not null,   name varchar2(255),   name2 varchar2(255),   name3 varchar2(20),  primary key (id))"});
        connection.execute(new String[]{"GRANT SELECT ON debezium.masked_hashed_column_table to  " + TestHelper.getConnectorUserName()});
        connection.execute(new String[]{"ALTER TABLE debezium.masked_hashed_column_table ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS"});
        connection.execute(new String[]{"create table debezium.truncated_column_table (  id numeric(9,0) not null,   name varchar2(20),   primary key (id))"});
        connection.execute(new String[]{"GRANT SELECT ON debezium.truncated_column_table to  " + TestHelper.getConnectorUserName()});
        connection.execute(new String[]{"ALTER TABLE debezium.truncated_column_table ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS"});
        connection.execute(new String[]{"create table dt_table (  id numeric(9,0) not null,   c1 int,   c2 int,   c3a numeric(5,2),   c3b varchar(128),   f1 float(10),   f2 decimal(8,4),   primary key (id))"});
        connection.execute(new String[]{"GRANT SELECT ON debezium.dt_table to  " + TestHelper.getConnectorUserName()});
        connection.execute(new String[]{"ALTER TABLE debezium.dt_table ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS"});
    }

    @AfterClass
    public static void closeConnection() throws SQLException {
        if (connection != null) {
            TestHelper.dropTable(connection, "debezium.customer2");
            TestHelper.dropTable(connection, "customer");
            TestHelper.dropTable(connection, "masked_hashed_column_table");
            TestHelper.dropTable(connection, "truncated_column_table");
            TestHelper.dropTable(connection, "dt_table");
            connection.close();
        }
    }

    @Before
    public void before() throws SQLException {
        TestHelper.dropTable(connection, "debezium.dbz800a");
        TestHelper.dropTable(connection, "debezium.dbz800b");
        connection.execute(new String[]{"delete from debezium.customer"});
        connection.execute(new String[]{"delete from debezium.masked_hashed_column_table"});
        connection.execute(new String[]{"delete from debezium.truncated_column_table"});
        connection.execute(new String[]{"delete from debezium.dt_table"});
        setConsumeTimeout(TestHelper.defaultMessageConsumerPollTimeout(), TimeUnit.SECONDS);
        initializeConnectorTestFramework();
        Testing.Files.delete(TestHelper.DB_HISTORY_PATH);
    }

    @Test
    @FixFor({"DBZ-2452"})
    public void shouldSnapshotAndStreamWithHyphenedTableName() throws Exception {
        TestHelper.dropTable(connection, "debezium.\"my-table\"");
        try {
            connection.execute(new String[]{"create table \"my-table\" ( id numeric(9,0) not null,  c1 int,  c2 varchar(128),  primary key (id))"});
            connection.execute(new String[]{"GRANT SELECT ON debezium.\"my-table\" to " + TestHelper.getConnectorUserName()});
            connection.execute(new String[]{"ALTER TABLE debezium.\"my-table\" ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS"});
            connection.execute(new String[]{"INSERT INTO debezium.\"my-table\" VALUES (1, 25, 'Test')"});
            connection.execute(new String[]{"COMMIT"});
            start(OracleConnector.class, TestHelper.defaultConfig().with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.MY-TABLE").build());
            assertConnectorIsRunning();
            waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
            connection.execute(new String[]{"INSERT INTO debezium.\"my-table\" VALUES (2, 50, 'Test2')"});
            connection.execute(new String[]{"COMMIT"});
            List recordsForTopic = consumeRecordsByTopic(2).recordsForTopic("server1.DEBEZIUM.my-table");
            Assertions.assertThat(recordsForTopic).hasSize(2);
            SourceRecord sourceRecord = (SourceRecord) recordsForTopic.get(0);
            VerifyRecord.isValidRead(sourceRecord, "ID", 1);
            Struct struct = (Struct) ((Struct) sourceRecord.value()).get("after");
            Assertions.assertThat(struct.get("ID")).isEqualTo(1);
            Assertions.assertThat(struct.get("C1")).isEqualTo(BigDecimal.valueOf(25L));
            Assertions.assertThat(struct.get("C2")).isEqualTo("Test");
            Assertions.assertThat(sourceRecord.sourceOffset().get("snapshot")).isEqualTo(true);
            Assertions.assertThat(sourceRecord.sourceOffset().get(SNAPSHOT_COMPLETED_KEY)).isEqualTo(true);
            SourceRecord sourceRecord2 = (SourceRecord) recordsForTopic.get(1);
            VerifyRecord.isValidInsert(sourceRecord2, "ID", 2);
            Struct struct2 = (Struct) ((Struct) sourceRecord2.value()).get("after");
            Assertions.assertThat(struct2.get("ID")).isEqualTo(2);
            Assertions.assertThat(struct2.get("C1")).isEqualTo(BigDecimal.valueOf(50L));
            Assertions.assertThat(struct2.get("C2")).isEqualTo("Test2");
            TestHelper.dropTable(connection, "debezium.\"my-table\"");
        } catch (Throwable th) {
            TestHelper.dropTable(connection, "debezium.\"my-table\"");
            throw th;
        }
    }

    @Test
    public void shouldTakeSnapshot() throws Exception {
        Configuration build = TestHelper.defaultConfig().with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.CUSTOMER").build();
        connection.execute(new String[]{"INSERT INTO debezium.customer VALUES (1, 'Billie-Bob', 1234.56, TO_DATE('2018-02-22', 'yyyy-mm-dd'))"});
        connection.execute(new String[]{"INSERT INTO debezium.customer VALUES (2, 'Bruce', 2345.67, null)"});
        connection.execute(new String[]{"COMMIT"});
        int i = 0 + 2;
        start(OracleConnector.class, build);
        assertConnectorIsRunning();
        List recordsForTopic = consumeRecordsByTopic(i).recordsForTopic("server1.DEBEZIUM.CUSTOMER");
        Assertions.assertThat(recordsForTopic).hasSize(i);
        SourceRecord sourceRecord = (SourceRecord) recordsForTopic.get(0);
        VerifyRecord.isValidRead(sourceRecord, "ID", 1);
        Struct struct = (Struct) ((Struct) sourceRecord.value()).get("after");
        Assertions.assertThat(struct.get("ID")).isEqualTo(1);
        Assertions.assertThat(struct.get("NAME")).isEqualTo("Billie-Bob");
        Assertions.assertThat(struct.get("SCORE")).isEqualTo(BigDecimal.valueOf(1234.56d));
        Assertions.assertThat(struct.get("REGISTERED")).isEqualTo(Long.valueOf(toMicroSecondsSinceEpoch(LocalDateTime.of(2018, 2, 22, 0, 0, 0))));
        Assertions.assertThat(sourceRecord.sourceOffset().get("snapshot")).isEqualTo(true);
        Assertions.assertThat(sourceRecord.sourceOffset().get(SNAPSHOT_COMPLETED_KEY)).isEqualTo(false);
        Assertions.assertThat(((Struct) ((Struct) sourceRecord.value()).get("source")).get("snapshot")).isEqualTo("true");
        SourceRecord sourceRecord2 = (SourceRecord) recordsForTopic.get(1);
        VerifyRecord.isValidRead(sourceRecord2, "ID", 2);
        Struct struct2 = (Struct) ((Struct) sourceRecord2.value()).get("after");
        Assertions.assertThat(struct2.get("ID")).isEqualTo(2);
        Assertions.assertThat(struct2.get("NAME")).isEqualTo("Bruce");
        Assertions.assertThat(struct2.get("SCORE")).isEqualTo(BigDecimal.valueOf(2345.67d));
        Assertions.assertThat(struct2.get("REGISTERED")).isNull();
        Assertions.assertThat(sourceRecord2.sourceOffset().get("snapshot")).isEqualTo(true);
        Assertions.assertThat(sourceRecord2.sourceOffset().get(SNAPSHOT_COMPLETED_KEY)).isEqualTo(true);
        Assertions.assertThat(((Struct) ((Struct) sourceRecord2.value()).get("source")).get("snapshot")).isEqualTo("last");
    }

    @Test
    public void shouldContinueWithStreamingAfterSnapshot() throws Exception {
        continueStreamingAfterSnapshot(TestHelper.defaultConfig().with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.CUSTOMER").build());
    }

    private void continueStreamingAfterSnapshot(Configuration configuration) throws Exception {
        connection.execute(new String[]{"INSERT INTO debezium.customer VALUES (1, 'Billie-Bob', 1234.56, TO_DATE('2018-02-22', 'yyyy-mm-dd'))"});
        connection.execute(new String[]{"INSERT INTO debezium.customer VALUES (2, 'Bruce', 2345.67, null)"});
        connection.execute(new String[]{"COMMIT"});
        int i = 0 + 2;
        start(OracleConnector.class, configuration);
        assertConnectorIsRunning();
        List recordsForTopic = consumeRecordsByTopic(i).recordsForTopic("server1.DEBEZIUM.CUSTOMER");
        Assertions.assertThat(recordsForTopic).hasSize(i);
        SourceRecord sourceRecord = (SourceRecord) recordsForTopic.get(0);
        VerifyRecord.isValidRead(sourceRecord, "ID", 1);
        Assertions.assertThat(((Struct) ((Struct) sourceRecord.value()).get("after")).get("ID")).isEqualTo(1);
        Struct struct = (Struct) ((Struct) sourceRecord.value()).get("source");
        Assertions.assertThat(struct.get("snapshot")).isEqualTo("true");
        Assertions.assertThat(struct.get("scn")).isNotNull();
        Assertions.assertThat(struct.get("name")).isEqualTo(TestHelper.SERVER_NAME);
        Assertions.assertThat(struct.get("version")).isNotNull();
        Assertions.assertThat(struct.get("txId")).isNull();
        Assertions.assertThat(struct.get("ts_ms")).isNotNull();
        Assertions.assertThat(sourceRecord.sourceOffset().get("snapshot")).isEqualTo(true);
        Assertions.assertThat(sourceRecord.sourceOffset().get(SNAPSHOT_COMPLETED_KEY)).isEqualTo(false);
        SourceRecord sourceRecord2 = (SourceRecord) recordsForTopic.get(1);
        VerifyRecord.isValidRead(sourceRecord2, "ID", 2);
        Assertions.assertThat(((Struct) ((Struct) sourceRecord2.value()).get("after")).get("ID")).isEqualTo(2);
        Assertions.assertThat(sourceRecord2.sourceOffset().get("snapshot")).isEqualTo(true);
        Assertions.assertThat(sourceRecord2.sourceOffset().get(SNAPSHOT_COMPLETED_KEY)).isEqualTo(true);
        connection.execute(new String[]{"INSERT INTO debezium.customer VALUES (3, 'Brian', 2345.67, null)"});
        connection.execute(new String[]{"COMMIT"});
        int i2 = 0 + 1;
        List recordsForTopic2 = consumeRecordsByTopic(i2).recordsForTopic("server1.DEBEZIUM.CUSTOMER");
        Assertions.assertThat(recordsForTopic2).hasSize(i2);
        SourceRecord sourceRecord3 = (SourceRecord) recordsForTopic2.get(0);
        VerifyRecord.isValidInsert(sourceRecord3, "ID", 3);
        Assertions.assertThat(((Struct) ((Struct) sourceRecord3.value()).get("after")).get("ID")).isEqualTo(3);
        Assertions.assertThat(sourceRecord3.sourceOffset().containsKey("snapshot")).isFalse();
        Assertions.assertThat(sourceRecord3.sourceOffset().containsKey(SNAPSHOT_COMPLETED_KEY)).isFalse();
        Struct struct2 = (Struct) ((Struct) sourceRecord3.value()).get("source");
        Assertions.assertThat(struct2.get("snapshot")).isEqualTo("false");
        Assertions.assertThat(struct2.get("scn")).isNotNull();
        Assertions.assertThat(struct2.get("name")).isEqualTo(TestHelper.SERVER_NAME);
        Assertions.assertThat(struct2.get("version")).isNotNull();
        Assertions.assertThat(struct2.get("txId")).isNotNull();
        Assertions.assertThat(struct2.get("ts_ms")).isNotNull();
    }

    @Test
    @FixFor({"DBZ-1223"})
    public void shouldStreamTransaction() throws Exception {
        Configuration build = TestHelper.defaultConfig().with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.CUSTOMER").build();
        connection.execute(new String[]{"INSERT INTO debezium.customer VALUES (1, 'Billie-Bob', 1234.56, TO_DATE('2018-02-22', 'yyyy-mm-dd'))"});
        connection.execute(new String[]{"INSERT INTO debezium.customer VALUES (2, 'Bruce', 2345.67, null)"});
        connection.execute(new String[]{"COMMIT"});
        int i = 0 + 2;
        start(OracleConnector.class, build);
        assertConnectorIsRunning();
        List recordsForTopic = consumeRecordsByTopic(i).recordsForTopic("server1.DEBEZIUM.CUSTOMER");
        Assertions.assertThat(recordsForTopic).hasSize(i);
        SourceRecord sourceRecord = (SourceRecord) recordsForTopic.get(0);
        VerifyRecord.isValidRead(sourceRecord, "ID", 1);
        Assertions.assertThat(((Struct) ((Struct) sourceRecord.value()).get("after")).get("ID")).isEqualTo(1);
        Struct struct = (Struct) ((Struct) sourceRecord.value()).get("source");
        Assertions.assertThat(struct.get("snapshot")).isEqualTo("true");
        Assertions.assertThat(struct.get("scn")).isNotNull();
        Assertions.assertThat(struct.get("name")).isEqualTo(TestHelper.SERVER_NAME);
        Assertions.assertThat(struct.get("version")).isNotNull();
        Assertions.assertThat(struct.get("txId")).isNull();
        Assertions.assertThat(struct.get("ts_ms")).isNotNull();
        Assertions.assertThat(sourceRecord.sourceOffset().get("snapshot")).isEqualTo(true);
        Assertions.assertThat(sourceRecord.sourceOffset().get(SNAPSHOT_COMPLETED_KEY)).isEqualTo(false);
        SourceRecord sourceRecord2 = (SourceRecord) recordsForTopic.get(1);
        VerifyRecord.isValidRead(sourceRecord2, "ID", 2);
        Assertions.assertThat(((Struct) ((Struct) sourceRecord2.value()).get("after")).get("ID")).isEqualTo(2);
        Assertions.assertThat(sourceRecord2.sourceOffset().get("snapshot")).isEqualTo(true);
        Assertions.assertThat(sourceRecord2.sourceOffset().get(SNAPSHOT_COMPLETED_KEY)).isEqualTo(true);
        connection.setAutoCommit(false);
        sendTxBatch(build, 30, 100);
        sendTxBatch(build, 30, 200);
    }

    private void sendTxBatch(Configuration configuration, int i, int i2) throws SQLException, InterruptedException {
        boolean z = false;
        if (connection.connection().getAutoCommit()) {
            z = true;
            connection.connection().setAutoCommit(false);
        }
        for (int i3 = i2; i3 < i + i2; i3++) {
            connection.executeWithoutCommitting(new String[]{String.format("INSERT INTO debezium.customer VALUES (%s, 'Brian%s', 2345.67, null)", Integer.valueOf(i3), Integer.valueOf(i3))});
        }
        connection.connection().commit();
        if (z) {
            connection.connection().setAutoCommit(true);
        }
        assertTxBatch(configuration, i, i2);
    }

    private void assertTxBatch(Configuration configuration, int i, int i2) throws InterruptedException {
        List recordsForTopic = consumeRecordsByTopic(i).recordsForTopic("server1.DEBEZIUM.CUSTOMER");
        Assertions.assertThat(recordsForTopic).hasSize(i);
        String string = configuration.getString(OracleConnectorConfig.CONNECTOR_ADAPTER);
        for (int i3 = 0; i3 < i; i3++) {
            SourceRecord sourceRecord = (SourceRecord) recordsForTopic.get(i3);
            VerifyRecord.isValidInsert(sourceRecord, "ID", i3 + i2);
            Assertions.assertThat(((Struct) ((Struct) sourceRecord.value()).get("after")).get("ID")).isEqualTo(Integer.valueOf(i3 + i2));
            Assertions.assertThat(sourceRecord.sourceOffset().containsKey("snapshot")).isFalse();
            Assertions.assertThat(sourceRecord.sourceOffset().containsKey(SNAPSHOT_COMPLETED_KEY)).isFalse();
            if (!"LogMiner".equalsIgnoreCase(string)) {
                Assertions.assertThat(sourceRecord.sourceOffset().containsKey("lcr_position")).isTrue();
                Assertions.assertThat(sourceRecord.sourceOffset().containsKey("scn")).isFalse();
            }
            Struct struct = (Struct) ((Struct) sourceRecord.value()).get("source");
            Assertions.assertThat(struct.get("snapshot")).isEqualTo("false");
            Assertions.assertThat(struct.get("scn")).isNotNull();
            if (!"LogMiner".equalsIgnoreCase(string)) {
                Assertions.assertThat(struct.get("lcr_position")).isNotNull();
            }
            Assertions.assertThat(struct.get("name")).isEqualTo(TestHelper.SERVER_NAME);
            Assertions.assertThat(struct.get("version")).isNotNull();
            Assertions.assertThat(struct.get("txId")).isNotNull();
            Assertions.assertThat(struct.get("ts_ms")).isNotNull();
        }
    }

    @Test
    public void shouldStreamAfterRestart() throws Exception {
        Configuration build = TestHelper.defaultConfig().with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.CUSTOMER").build();
        connection.execute(new String[]{"INSERT INTO debezium.customer VALUES (1, 'Billie-Bob', 1234.56, TO_DATE('2018-02-22', 'yyyy-mm-dd'))"});
        connection.execute(new String[]{"INSERT INTO debezium.customer VALUES (2, 'Bruce', 2345.67, null)"});
        connection.execute(new String[]{"COMMIT"});
        int i = 0 + 2;
        start(OracleConnector.class, build);
        assertConnectorIsRunning();
        waitForSnapshotToBeCompleted(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
        Assertions.assertThat(consumeRecordsByTopic(i).recordsForTopic("server1.DEBEZIUM.CUSTOMER")).hasSize(i);
        connection.setAutoCommit(false);
        sendTxBatch(build, 30, 100);
        sendTxBatch(build, 30, 200);
        stopConnector();
        for (int i2 = 300; i2 < 30 + 300; i2++) {
            connection.executeWithoutCommitting(new String[]{String.format("INSERT INTO debezium.customer VALUES (%s, 'Brian%s', 2345.67, null)", Integer.valueOf(i2), Integer.valueOf(i2))});
        }
        connection.connection().commit();
        start(OracleConnector.class, build);
        assertConnectorIsRunning();
        waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
        assertTxBatch(build, 30, 300);
        sendTxBatch(build, 30, 400);
        sendTxBatch(build, 30, 500);
    }

    @Test
    public void shouldStreamAfterRestartAfterSnapshot() throws Exception {
        Configuration build = TestHelper.defaultConfig().with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.CUSTOMER").build();
        connection.execute(new String[]{"INSERT INTO debezium.customer VALUES (1, 'Billie-Bob', 1234.56, TO_DATE('2018-02-22', 'yyyy-mm-dd'))"});
        connection.execute(new String[]{"INSERT INTO debezium.customer VALUES (2, 'Bruce', 2345.67, null)"});
        connection.execute(new String[]{"COMMIT"});
        int i = 0 + 2;
        start(OracleConnector.class, build);
        assertConnectorIsRunning();
        Assertions.assertThat(consumeRecordsByTopic(i).recordsForTopic("server1.DEBEZIUM.CUSTOMER")).hasSize(i);
        stopConnector();
        connection.setAutoCommit(false);
        for (int i2 = 100; i2 < i + 100; i2++) {
            connection.executeWithoutCommitting(new String[]{String.format("INSERT INTO debezium.customer VALUES (%s, 'Brian%s', 2345.67, null)", Integer.valueOf(i2), Integer.valueOf(i2))});
        }
        connection.connection().commit();
        connection.setAutoCommit(true);
        Testing.print("=== Starting connector second time ===");
        start(OracleConnector.class, build);
        assertConnectorIsRunning();
        assertTxBatch(build, i, 100);
        sendTxBatch(build, i, 200);
    }

    @Test
    public void shouldReadChangeStreamForExistingTable() throws Exception {
        start(OracleConnector.class, TestHelper.defaultConfig().with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.CUSTOMER").with(OracleConnectorConfig.SNAPSHOT_MODE, OracleConnectorConfig.SnapshotMode.SCHEMA_ONLY).build());
        assertConnectorIsRunning();
        waitForSnapshotToBeCompleted(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
        connection.execute(new String[]{"INSERT INTO debezium.customer VALUES (1, 'Billie-Bob', 1234.56, TO_DATE('2018-02-22', 'yyyy-mm-dd'))"});
        connection.execute(new String[]{"COMMIT"});
        connection.execute(new String[]{"UPDATE debezium.customer SET name = 'Bruce', score = 2345.67, registered = TO_DATE('2018-03-23', 'yyyy-mm-dd') WHERE id = 1"});
        connection.execute(new String[]{"COMMIT"});
        connection.execute(new String[]{"UPDATE debezium.customer SET id = 2 WHERE id = 1"});
        connection.execute(new String[]{"COMMIT"});
        connection.execute(new String[]{"DELETE debezium.customer WHERE id = 2"});
        connection.execute(new String[]{"COMMIT"});
        int i = 0 + 1 + 1 + 3 + 2;
        List recordsForTopic = consumeRecordsByTopic(i).recordsForTopic("server1.DEBEZIUM.CUSTOMER");
        Assertions.assertThat(recordsForTopic).hasSize(i);
        VerifyRecord.isValidInsert((SourceRecord) recordsForTopic.get(0), "ID", 1);
        Struct struct = (Struct) ((Struct) ((SourceRecord) recordsForTopic.get(0)).value()).get("after");
        Assertions.assertThat(struct.get("ID")).isEqualTo(1);
        Assertions.assertThat(struct.get("NAME")).isEqualTo("Billie-Bob");
        Assertions.assertThat(struct.get("SCORE")).isEqualTo(BigDecimal.valueOf(1234.56d));
        Assertions.assertThat(struct.get("REGISTERED")).isEqualTo(Long.valueOf(toMicroSecondsSinceEpoch(LocalDateTime.of(2018, 2, 22, 0, 0, 0))));
        Map sourceOffset = ((SourceRecord) recordsForTopic.get(0)).sourceOffset();
        Assertions.assertThat(sourceOffset.get("snapshot")).isNull();
        Assertions.assertThat(sourceOffset.get(SNAPSHOT_COMPLETED_KEY)).isNull();
        VerifyRecord.isValidUpdate((SourceRecord) recordsForTopic.get(1), "ID", 1);
        Struct struct2 = (Struct) ((Struct) ((SourceRecord) recordsForTopic.get(1)).value()).get("before");
        Assertions.assertThat(struct2.get("ID")).isEqualTo(1);
        Assertions.assertThat(struct2.get("NAME")).isEqualTo("Billie-Bob");
        Assertions.assertThat(struct2.get("SCORE")).isEqualTo(BigDecimal.valueOf(1234.56d));
        Assertions.assertThat(struct2.get("REGISTERED")).isEqualTo(Long.valueOf(toMicroSecondsSinceEpoch(LocalDateTime.of(2018, 2, 22, 0, 0, 0))));
        Struct struct3 = (Struct) ((Struct) ((SourceRecord) recordsForTopic.get(1)).value()).get("after");
        Assertions.assertThat(struct3.get("ID")).isEqualTo(1);
        Assertions.assertThat(struct3.get("NAME")).isEqualTo("Bruce");
        Assertions.assertThat(struct3.get("SCORE")).isEqualTo(BigDecimal.valueOf(2345.67d));
        Assertions.assertThat(struct3.get("REGISTERED")).isEqualTo(Long.valueOf(toMicroSecondsSinceEpoch(LocalDateTime.of(2018, 3, 23, 0, 0, 0))));
        VerifyRecord.isValidDelete((SourceRecord) recordsForTopic.get(2), "ID", 1);
        Struct struct4 = (Struct) ((Struct) ((SourceRecord) recordsForTopic.get(2)).value()).get("before");
        Assertions.assertThat(struct4.get("ID")).isEqualTo(1);
        Assertions.assertThat(struct4.get("NAME")).isEqualTo("Bruce");
        Assertions.assertThat(struct4.get("SCORE")).isEqualTo(BigDecimal.valueOf(2345.67d));
        Assertions.assertThat(struct4.get("REGISTERED")).isEqualTo(Long.valueOf(toMicroSecondsSinceEpoch(LocalDateTime.of(2018, 3, 23, 0, 0, 0))));
        VerifyRecord.isValidTombstone((SourceRecord) recordsForTopic.get(3));
        VerifyRecord.isValidInsert((SourceRecord) recordsForTopic.get(4), "ID", 2);
        Struct struct5 = (Struct) ((Struct) ((SourceRecord) recordsForTopic.get(4)).value()).get("after");
        Assertions.assertThat(struct5.get("ID")).isEqualTo(2);
        Assertions.assertThat(struct5.get("NAME")).isEqualTo("Bruce");
        Assertions.assertThat(struct5.get("SCORE")).isEqualTo(BigDecimal.valueOf(2345.67d));
        Assertions.assertThat(struct5.get("REGISTERED")).isEqualTo(Long.valueOf(toMicroSecondsSinceEpoch(LocalDateTime.of(2018, 3, 23, 0, 0, 0))));
        VerifyRecord.isValidDelete((SourceRecord) recordsForTopic.get(5), "ID", 2);
        Struct struct6 = (Struct) ((Struct) ((SourceRecord) recordsForTopic.get(5)).value()).get("before");
        Assertions.assertThat(struct6.get("ID")).isEqualTo(2);
        Assertions.assertThat(struct6.get("NAME")).isEqualTo("Bruce");
        Assertions.assertThat(struct6.get("SCORE")).isEqualTo(BigDecimal.valueOf(2345.67d));
        Assertions.assertThat(struct6.get("REGISTERED")).isEqualTo(Long.valueOf(toMicroSecondsSinceEpoch(LocalDateTime.of(2018, 3, 23, 0, 0, 0))));
        VerifyRecord.isValidTombstone((SourceRecord) recordsForTopic.get(6));
    }

    @Test
    @FixFor({"DBZ-835"})
    public void deleteWithoutTombstone() throws Exception {
        start(OracleConnector.class, TestHelper.defaultConfig().with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.CUSTOMER").with(OracleConnectorConfig.SNAPSHOT_MODE, OracleConnectorConfig.SnapshotMode.SCHEMA_ONLY).with(OracleConnectorConfig.TOMBSTONES_ON_DELETE, false).build());
        assertConnectorIsRunning();
        waitForSnapshotToBeCompleted(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
        connection.execute(new String[]{"INSERT INTO debezium.customer VALUES (1, 'Billie-Bob', 1234.56, TO_DATE('2018-02-22', 'yyyy-mm-dd'))"});
        connection.execute(new String[]{"COMMIT"});
        connection.execute(new String[]{"DELETE debezium.customer WHERE id = 1"});
        connection.execute(new String[]{"COMMIT"});
        connection.execute(new String[]{"INSERT INTO debezium.customer VALUES (2, 'Billie-Bob', 1234.56, TO_DATE('2018-02-22', 'yyyy-mm-dd'))"});
        connection.execute(new String[]{"COMMIT"});
        int i = 0 + 1 + 1 + 1;
        List recordsForTopic = consumeRecordsByTopic(i).recordsForTopic("server1.DEBEZIUM.CUSTOMER");
        Assertions.assertThat(recordsForTopic).hasSize(i);
        VerifyRecord.isValidDelete((SourceRecord) recordsForTopic.get(1), "ID", 1);
        Struct struct = ((Struct) ((SourceRecord) recordsForTopic.get(1)).value()).getStruct("before");
        Assertions.assertThat(struct.get("ID")).isEqualTo(1);
        Assertions.assertThat(struct.get("NAME")).isEqualTo("Billie-Bob");
        Assertions.assertThat(struct.get("SCORE")).isEqualTo(BigDecimal.valueOf(1234.56d));
        Assertions.assertThat(struct.get("REGISTERED")).isEqualTo(Long.valueOf(toMicroSecondsSinceEpoch(LocalDateTime.of(2018, 2, 22, 0, 0, 0))));
        VerifyRecord.isValidInsert((SourceRecord) recordsForTopic.get(2), "ID", 2);
    }

    @Test
    public void shouldReadChangeStreamForTableCreatedWhileStreaming() throws Exception {
        TestHelper.dropTable(connection, "debezium.customer2");
        start(OracleConnector.class, TestHelper.defaultConfig().with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.CUSTOMER2").build());
        assertConnectorIsRunning();
        waitForSnapshotToBeCompleted(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
        connection.execute(new String[]{"create table debezium.customer2 (  id numeric(9,0) not null,   name varchar2(1000),   score decimal(6, 2),   registered timestamp,   primary key (id))"});
        TestHelper.streamTable(connection, "debezium.customer2");
        connection.execute(new String[]{"INSERT INTO debezium.customer2 VALUES (2, 'Billie-Bob', 1234.56, TO_DATE('2018-02-22', 'yyyy-mm-dd'))"});
        connection.execute(new String[]{"COMMIT"});
        List recordsForTopic = consumeRecordsByTopic(1).recordsForTopic("server1.DEBEZIUM.CUSTOMER2");
        Assertions.assertThat(recordsForTopic).hasSize(1);
        VerifyRecord.isValidInsert((SourceRecord) recordsForTopic.get(0), "ID", 2);
        Struct struct = (Struct) ((Struct) ((SourceRecord) recordsForTopic.get(0)).value()).get("after");
        Assertions.assertThat(struct.get("ID")).isEqualTo(2);
        Assertions.assertThat(struct.get("NAME")).isEqualTo("Billie-Bob");
        Assertions.assertThat(struct.get("SCORE")).isEqualTo(BigDecimal.valueOf(1234.56d));
        Assertions.assertThat(struct.get("REGISTERED")).isEqualTo(Long.valueOf(toMicroSecondsSinceEpoch(LocalDateTime.of(2018, 2, 22, 0, 0, 0))));
    }

    @Test
    @FixFor({"DBZ-800"})
    public void shouldReceiveHeartbeatAlsoWhenChangingTableIncludeListTables() throws Exception {
        TestHelper.dropTable(connection, "debezium.dbz800a");
        TestHelper.dropTable(connection, "debezium.dbz800b");
        start(OracleConnector.class, TestHelper.defaultConfig().with(Heartbeat.HEARTBEAT_INTERVAL, "1").with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.DBZ800B").build());
        assertConnectorIsRunning();
        waitForSnapshotToBeCompleted(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
        connection.execute(new String[]{"CREATE TABLE debezium.dbz800a (id NUMBER(9) NOT NULL, aaa VARCHAR2(100), PRIMARY KEY (id) )"});
        connection.execute(new String[]{"CREATE TABLE debezium.dbz800b (id NUMBER(9) NOT NULL, bbb VARCHAR2(100), PRIMARY KEY (id) )"});
        connection.execute(new String[]{"INSERT INTO debezium.dbz800a VALUES (1, 'AAA')"});
        connection.execute(new String[]{"INSERT INTO debezium.dbz800b VALUES (2, 'BBB')"});
        connection.execute(new String[]{"COMMIT"});
        AtomicReference atomicReference = new AtomicReference();
        Awaitility.await().atMost(Duration.ofSeconds(60L)).until(() -> {
            if (atomicReference.get() == null) {
                atomicReference.set(consumeRecordsByTopic(1));
            } else {
                List allRecordsInOrder = consumeRecordsByTopic(1).allRecordsInOrder();
                AbstractConnectorTest.SourceRecords sourceRecords = (AbstractConnectorTest.SourceRecords) atomicReference.get();
                Objects.requireNonNull(sourceRecords);
                allRecordsInOrder.forEach(sourceRecords::add);
            }
            return Boolean.valueOf(((AbstractConnectorTest.SourceRecords) atomicReference.get()).recordsForTopic("server1.DEBEZIUM.DBZ800B") != null);
        });
        List recordsForTopic = ((AbstractConnectorTest.SourceRecords) atomicReference.get()).recordsForTopic("__debezium-heartbeat.server1");
        List recordsForTopic2 = ((AbstractConnectorTest.SourceRecords) atomicReference.get()).recordsForTopic("server1.DEBEZIUM.DBZ800A");
        List recordsForTopic3 = ((AbstractConnectorTest.SourceRecords) atomicReference.get()).recordsForTopic("server1.DEBEZIUM.DBZ800B");
        Assertions.assertThat(recordsForTopic).isNotEmpty();
        Assertions.assertThat(recordsForTopic2).isNull();
        Assertions.assertThat(recordsForTopic3).hasSize(1);
        VerifyRecord.isValidInsert((SourceRecord) recordsForTopic3.get(0), "ID", 2);
    }

    @Test
    @FixFor({"DBZ-775"})
    public void shouldConsumeEventsWithMaskedAndTruncatedColumnsWithDatabaseName() throws Exception {
        shouldConsumeEventsWithMaskedAndTruncatedColumns(true);
    }

    @Test
    @FixFor({"DBZ-775"})
    public void shouldConsumeEventsWithMaskedAndTruncatedColumnsWithoutDatabaseName() throws Exception {
        shouldConsumeEventsWithMaskedAndTruncatedColumns(false);
    }

    public void shouldConsumeEventsWithMaskedAndTruncatedColumns(boolean z) throws Exception {
        Configuration build;
        if (z) {
            String databaseName = TestHelper.getDatabaseName();
            build = TestHelper.defaultConfig().with(OracleConnectorConfig.SNAPSHOT_MODE, OracleConnectorConfig.SnapshotMode.SCHEMA_ONLY).with("column.mask.with.12.chars", databaseName + ".DEBEZIUM.MASKED_HASHED_COLUMN_TABLE.NAME").with("column.mask.hash.SHA-256.with.salt.CzQMA0cB5K", databaseName + ".DEBEZIUM.MASKED_HASHED_COLUMN_TABLE.NAME2," + databaseName + ".DEBEZIUM.MASKED_HASHED_COLUMN_TABLE.NAME3").with("column.truncate.to.4.chars", databaseName + ".DEBEZIUM.TRUNCATED_COLUMN_TABLE.NAME").build();
        } else {
            build = TestHelper.defaultConfig().with(OracleConnectorConfig.SNAPSHOT_MODE, OracleConnectorConfig.SnapshotMode.SCHEMA_ONLY).with("column.mask.with.12.chars", "DEBEZIUM.MASKED_HASHED_COLUMN_TABLE.NAME").with("column.mask.hash.SHA-256.with.salt.CzQMA0cB5K", "DEBEZIUM.MASKED_HASHED_COLUMN_TABLE.NAME2,DEBEZIUM.MASKED_HASHED_COLUMN_TABLE.NAME3").with("column.truncate.to.4.chars", "DEBEZIUM.TRUNCATED_COLUMN_TABLE.NAME").build();
        }
        start(OracleConnector.class, build);
        assertConnectorIsRunning();
        waitForSnapshotToBeCompleted(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
        connection.execute(new String[]{"INSERT INTO debezium.masked_hashed_column_table (id, name, name2, name3) VALUES (10, 'some_name', 'test', 'test')"});
        connection.execute(new String[]{"INSERT INTO debezium.truncated_column_table VALUES(11, 'some_name')"});
        connection.execute(new String[]{"COMMIT"});
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(2);
        List recordsForTopic = consumeRecordsByTopic.recordsForTopic("server1.DEBEZIUM.MASKED_HASHED_COLUMN_TABLE");
        List recordsForTopic2 = consumeRecordsByTopic.recordsForTopic("server1.DEBEZIUM.TRUNCATED_COLUMN_TABLE");
        Assertions.assertThat(recordsForTopic).hasSize(1);
        SourceRecord sourceRecord = (SourceRecord) recordsForTopic.get(0);
        VerifyRecord.isValidInsert(sourceRecord, "ID", 10);
        Struct struct = (Struct) sourceRecord.value();
        if (struct.getStruct("after") != null) {
            Struct struct2 = struct.getStruct("after");
            Assertions.assertThat(struct2.getString("NAME")).isEqualTo("************");
            Assertions.assertThat(struct2.getString("NAME2")).isEqualTo("8e68c68edbbac316dfe2f6ada6b0d2d3e2002b487a985d4b7c7c82dd83b0f4d7");
            Assertions.assertThat(struct2.getString("NAME3")).isEqualTo("8e68c68edbbac316dfe2");
        }
        Assertions.assertThat(recordsForTopic2).hasSize(1);
        SourceRecord sourceRecord2 = (SourceRecord) recordsForTopic2.get(0);
        VerifyRecord.isValidInsert(sourceRecord2, "ID", 11);
        Struct struct3 = (Struct) sourceRecord2.value();
        if (struct3.getStruct("after") != null) {
            Assertions.assertThat(struct3.getStruct("after").getString("NAME")).isEqualTo("some");
        }
        stopConnector();
    }

    @Test
    @FixFor({"DBZ-775"})
    public void shouldRewriteIdentityKeyWithDatabaseName() throws Exception {
        shouldRewriteIdentityKey(true);
    }

    @Test
    @FixFor({"DBZ-775"})
    public void shouldRewriteIdentityKeyWithoutDatabaseName() throws Exception {
        shouldRewriteIdentityKey(false);
    }

    private void shouldRewriteIdentityKey(boolean z) throws Exception {
        start(OracleConnector.class, z ? TestHelper.defaultConfig().with(OracleConnectorConfig.SNAPSHOT_MODE, OracleConnectorConfig.SnapshotMode.SCHEMA_ONLY).with(OracleConnectorConfig.MSG_KEY_COLUMNS, "(.*).debezium.customer:id,name").build() : TestHelper.defaultConfig().with(OracleConnectorConfig.SNAPSHOT_MODE, OracleConnectorConfig.SnapshotMode.SCHEMA_ONLY).with(OracleConnectorConfig.MSG_KEY_COLUMNS, "debezium.customer:id,name").build());
        assertConnectorIsRunning();
        waitForSnapshotToBeCompleted(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
        connection.execute(new String[]{"INSERT INTO debezium.customer VALUES (3, 'Nest', 1234.56, TO_DATE('2018-02-22', 'yyyy-mm-dd'))"});
        connection.execute(new String[]{"COMMIT"});
        List recordsForTopic = consumeRecordsByTopic(1).recordsForTopic("server1.DEBEZIUM.CUSTOMER");
        Assertions.assertThat(((SourceRecord) recordsForTopic.get(0)).key()).isNotNull();
        Struct struct = (Struct) ((SourceRecord) recordsForTopic.get(0)).key();
        Assertions.assertThat(struct.get("ID")).isNotNull();
        Assertions.assertThat(struct.get("NAME")).isNotNull();
        stopConnector();
    }

    @Test
    @FixFor({"DBZ-1916", "DBZ-1830"})
    public void shouldPropagateSourceTypeByDatatype() throws Exception {
        start(OracleConnector.class, TestHelper.defaultConfig().with(OracleConnectorConfig.SNAPSHOT_MODE, OracleConnectorConfig.SnapshotMode.SCHEMA_ONLY).with("datatype.propagate.source.type", ".+\\.NUMBER,.+\\.VARCHAR2,.+\\.FLOAT").build());
        assertConnectorIsRunning();
        waitForSnapshotToBeCompleted(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
        connection.execute(new String[]{"INSERT INTO debezium.dt_table (id,c1,c2,c3a,c3b,f1,f2) values (1,123,456,789.01,'test',1.228,234.56)"});
        connection.execute(new String[]{"COMMIT"});
        List recordsForTopic = consumeRecordsByTopic(1).recordsForTopic("server1.DEBEZIUM.DT_TABLE");
        Assertions.assertThat(recordsForTopic).hasSize(1);
        Field field = ((SourceRecord) recordsForTopic.get(0)).valueSchema().field("before");
        Assertions.assertThat(field.schema().field("ID").schema().parameters()).includes(new MapAssert.Entry[]{MapAssert.entry(TestHelper.TYPE_NAME_PARAMETER_KEY, "NUMBER"), MapAssert.entry(TestHelper.TYPE_LENGTH_PARAMETER_KEY, "9"), MapAssert.entry(TestHelper.TYPE_SCALE_PARAMETER_KEY, "0")});
        Assertions.assertThat(field.schema().field("C1").schema().parameters()).includes(new MapAssert.Entry[]{MapAssert.entry(TestHelper.TYPE_NAME_PARAMETER_KEY, "NUMBER"), MapAssert.entry(TestHelper.TYPE_LENGTH_PARAMETER_KEY, "38"), MapAssert.entry(TestHelper.TYPE_SCALE_PARAMETER_KEY, "0")});
        Assertions.assertThat(field.schema().field("C2").schema().parameters()).includes(new MapAssert.Entry[]{MapAssert.entry(TestHelper.TYPE_NAME_PARAMETER_KEY, "NUMBER"), MapAssert.entry(TestHelper.TYPE_LENGTH_PARAMETER_KEY, "38"), MapAssert.entry(TestHelper.TYPE_SCALE_PARAMETER_KEY, "0")});
        Assertions.assertThat(field.schema().field("C3A").schema().parameters()).includes(new MapAssert.Entry[]{MapAssert.entry(TestHelper.TYPE_NAME_PARAMETER_KEY, "NUMBER"), MapAssert.entry(TestHelper.TYPE_LENGTH_PARAMETER_KEY, "5"), MapAssert.entry(TestHelper.TYPE_SCALE_PARAMETER_KEY, "2")});
        Assertions.assertThat(field.schema().field("C3B").schema().parameters()).includes(new MapAssert.Entry[]{MapAssert.entry(TestHelper.TYPE_NAME_PARAMETER_KEY, "VARCHAR2"), MapAssert.entry(TestHelper.TYPE_LENGTH_PARAMETER_KEY, "128")});
        Assertions.assertThat(field.schema().field("F2").schema().parameters()).includes(new MapAssert.Entry[]{MapAssert.entry(TestHelper.TYPE_NAME_PARAMETER_KEY, "NUMBER"), MapAssert.entry(TestHelper.TYPE_LENGTH_PARAMETER_KEY, "8"), MapAssert.entry(TestHelper.TYPE_SCALE_PARAMETER_KEY, "4")});
        Assertions.assertThat(field.schema().field("F1").schema().parameters()).includes(new MapAssert.Entry[]{MapAssert.entry(TestHelper.TYPE_NAME_PARAMETER_KEY, "FLOAT"), MapAssert.entry(TestHelper.TYPE_LENGTH_PARAMETER_KEY, "10")});
    }

    @Test
    @FixFor({"DBZ-2624"})
    public void shouldSnapshotAndStreamChangesFromTableWithNumericDefaultValues() throws Exception {
        TestHelper.dropTable(connection, "debezium.complex_ddl");
        try {
            connection.execute(new String[]{"create table debezium.complex_ddl ( id numeric(6) constraint customers_id_nn not null,  name varchar2(100), value numeric default 1,  constraint customers_pk primary key(id))"});
            connection.execute(new String[]{"GRANT SELECT ON debezium.complex_ddl to " + TestHelper.getConnectorUserName()});
            connection.execute(new String[]{"ALTER TABLE debezium.complex_ddl ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS"});
            connection.execute(new String[]{"INSERT INTO debezium.complex_ddl (id, name) values (1, 'Acme')"});
            connection.commit();
            start(OracleConnector.class, TestHelper.defaultConfig().with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM.COMPLEX_DDL").with(OracleConnectorConfig.LOG_MINING_STRATEGY, "online_catalog").build());
            assertConnectorIsRunning();
            waitForSnapshotToBeCompleted(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
            Assertions.assertThat(consumeRecordsByTopic(1).recordsForTopic("server1.DEBEZIUM.COMPLEX_DDL").size()).isEqualTo(1);
            waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
            connection.execute(new String[]{"INSERT INTO debezium.complex_ddl (id, name)values (2, 'Acme2')"});
            connection.commit();
            Assertions.assertThat(consumeRecordsByTopic(1).recordsForTopic("server1.DEBEZIUM.COMPLEX_DDL").size()).isEqualTo(1);
            TestHelper.dropTable(connection, "debezium.complex_ddl");
        } catch (Throwable th) {
            TestHelper.dropTable(connection, "debezium.complex_ddl");
            throw th;
        }
    }

    @Test
    @FixFor({"DBZ-2683"})
    @RequireDatabaseOption("Partitioning")
    public void shouldSnapshotAndStreamChangesFromPartitionedTable() throws Exception {
        TestHelper.dropTable(connection, "players");
        try {
            connection.execute(new String[]{"CREATE TABLE players (id NUMERIC(6), name VARCHAR(100), birth_date DATE,primary key(id)) PARTITION BY RANGE (birth_date) (PARTITION p2019 VALUES LESS THAN (TO_DATE('2020-01-01', 'yyyy-mm-dd')), PARTITION p2020 VALUES LESS THAN (TO_DATE('2021-01-01', 'yyyy-mm-dd')))"});
            connection.execute(new String[]{"GRANT SELECT ON debezium.players to " + TestHelper.getConnectorUserName()});
            connection.execute(new String[]{"ALTER TABLE debezium.players ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS"});
            connection.execute(new String[]{"INSERT INTO debezium.players (id, name, birth_date) VALUES (1, 'Roger Rabbit', TO_DATE('2019-05-01', 'yyyy-mm-dd'))"});
            connection.commit();
            start(OracleConnector.class, TestHelper.defaultConfig().with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM.PLAYERS").build());
            assertConnectorIsRunning();
            waitForSnapshotToBeCompleted(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
            Assertions.assertThat(consumeRecordsByTopic(1).recordsForTopic("server1.DEBEZIUM.PLAYERS").size()).isEqualTo(1);
            waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
            connection.execute(new String[]{"INSERT INTO debezium.players (id, name, birth_date) VALUES (2, 'Bugs Bunny', TO_DATE('2019-06-26', 'yyyy-mm-dd'))"});
            connection.execute(new String[]{"INSERT INTO debezium.players (id, name, birth_date) VALUES (3, 'Elmer Fud', TO_DATE('2020-11-01', 'yyyy-mm-dd'))"});
            connection.commit();
            Assertions.assertThat(consumeRecordsByTopic(2).recordsForTopic("server1.DEBEZIUM.PLAYERS").size()).isEqualTo(2);
            TestHelper.dropTable(connection, "players");
        } catch (Throwable th) {
            TestHelper.dropTable(connection, "players");
            throw th;
        }
    }

    @Test
    @FixFor({"DBZ-2849"})
    public void shouldAvroSerializeColumnsWithSpecialCharacters() throws Exception {
        TestHelper.dropTable(connection, "columns_test");
        try {
            connection.execute(new String[]{"CREATE TABLE columns_test (id NUMERIC(6), amount$ number not null, primary key(id))"});
            connection.execute(new String[]{"GRANT SELECT ON debezium.columns_test to " + TestHelper.getConnectorUserName()});
            connection.execute(new String[]{"ALTER TABLE debezium.columns_test ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS"});
            connection.execute(new String[]{"INSERT INTO debezium.columns_test (id, amount$) values (1, 12345.67)"});
            connection.commit();
            start(OracleConnector.class, TestHelper.defaultConfig().with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM.COLUMNS_TEST").with(OracleConnectorConfig.SANITIZE_FIELD_NAMES, "true").build());
            assertConnectorIsRunning();
            waitForSnapshotToBeCompleted(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
            AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(1);
            Assertions.assertThat(consumeRecordsByTopic.recordsForTopic("server1.DEBEZIUM.COLUMNS_TEST").size()).isEqualTo(1);
            SourceRecord sourceRecord = (SourceRecord) consumeRecordsByTopic.recordsForTopic("server1.DEBEZIUM.COLUMNS_TEST").get(0);
            VerifyRecord.isValidRead(sourceRecord, "ID", 1);
            Struct struct = ((Struct) sourceRecord.value()).getStruct("after");
            Assertions.assertThat(struct.getInt32("ID")).isEqualTo(1);
            Assertions.assertThat(struct.get("AMOUNT_")).isEqualTo(VariableScaleDecimal.fromLogical(struct.schema().field("AMOUNT_").schema(), BigDecimal.valueOf(12345.67d)));
            waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
            connection.execute(new String[]{"INSERT INTO debezium.columns_test (id, amount$) values (2, 23456.78)"});
            connection.commit();
            AbstractConnectorTest.SourceRecords consumeRecordsByTopic2 = consumeRecordsByTopic(1);
            Assertions.assertThat(consumeRecordsByTopic2.recordsForTopic("server1.DEBEZIUM.COLUMNS_TEST").size()).isEqualTo(1);
            SourceRecord sourceRecord2 = (SourceRecord) consumeRecordsByTopic2.recordsForTopic("server1.DEBEZIUM.COLUMNS_TEST").get(0);
            VerifyRecord.isValidInsert(sourceRecord2, "ID", 2);
            Struct struct2 = ((Struct) sourceRecord2.value()).getStruct("after");
            Assertions.assertThat(struct2.getInt32("ID")).isEqualTo(2);
            Assertions.assertThat(struct2.get("AMOUNT_")).isEqualTo(VariableScaleDecimal.fromLogical(struct2.schema().field("AMOUNT_").schema(), BigDecimal.valueOf(23456.78d)));
            TestHelper.dropTable(connection, "columns_test");
        } catch (Throwable th) {
            TestHelper.dropTable(connection, "columns_test");
            throw th;
        }
    }

    @Test
    @FixFor({"DBZ-2825"})
    @SkipWhenAdapterNameIsNot(value = SkipWhenAdapterNameIsNot.AdapterName.LOGMINER, reason = "Tests archive log support for LogMiner only")
    public void testArchiveLogScnBoundariesAreIncluded() throws Exception {
        TestHelper.dropTable(connection, "alog_test");
        try {
            connection.execute(new String[]{"CREATE TABLE alog_test (id numeric, name varchar2(50), primary key(id))"});
            connection.execute(new String[]{"GRANT SELECT ON debezium.alog_test TO " + TestHelper.getConnectorUserName()});
            connection.execute(new String[]{"ALTER TABLE debezium.alog_test ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS"});
            connection.commit();
            connection.execute(new String[]{"INSERT INTO debezium.alog_test (id, name) VALUES (1, 'Test')"});
            connection.commit();
            Configuration build = TestHelper.defaultConfig().with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM.ALOG_TEST").build();
            start(OracleConnector.class, build);
            assertConnectorIsRunning();
            waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
            AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(1);
            Assertions.assertThat(consumeRecordsByTopic.recordsForTopic("server1.DEBEZIUM.ALOG_TEST").size()).isEqualTo(1);
            Struct struct = (Struct) ((Struct) ((SourceRecord) consumeRecordsByTopic.recordsForTopic("server1.DEBEZIUM.ALOG_TEST").get(0)).value()).get("after");
            Assertions.assertThat(struct.get("ID")).isEqualTo(BigDecimal.valueOf(1L));
            Assertions.assertThat(struct.get("NAME")).isEqualTo("Test");
            stopConnector();
            TestHelper.forceFlushOfRedoLogsToArchiveLogs();
            start(OracleConnector.class, build);
            waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
            connection.execute(new String[]{"INSERT INTO debezium.alog_test (id, name) values (2, 'Home')"});
            connection.execute(new String[]{"COMMIT"});
            AbstractConnectorTest.SourceRecords consumeRecordsByTopic2 = consumeRecordsByTopic(1);
            Assertions.assertThat(consumeRecordsByTopic2.recordsForTopic("server1.DEBEZIUM.ALOG_TEST").size()).isEqualTo(1);
            Struct struct2 = (Struct) ((Struct) ((SourceRecord) consumeRecordsByTopic2.recordsForTopic("server1.DEBEZIUM.ALOG_TEST").get(0)).value()).get("after");
            Assertions.assertThat(struct2.get("ID")).isEqualTo(BigDecimal.valueOf(2L));
            Assertions.assertThat(struct2.get("NAME")).isEqualTo("Home");
            TestHelper.dropTable(connection, "alog_test");
        } catch (Throwable th) {
            TestHelper.dropTable(connection, "alog_test");
            throw th;
        }
    }

    @Test
    @FixFor({"DBZ-2784"})
    public void shouldConvertDatesSpecifiedAsStringInSQL() throws Exception {
        try {
            TestHelper.dropTable(connection, "orders");
            connection.execute(new String[]{"CREATE TABLE orders (id NUMERIC(6), order_date date not null,primary key(id))"});
            connection.execute(new String[]{"GRANT SELECT ON debezium.orders TO " + TestHelper.getConnectorUserName()});
            connection.execute(new String[]{"ALTER TABLE debezium.orders ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS"});
            connection.execute(new String[]{"INSERT INTO debezium.orders VALUES (9, '22-FEB-2018')"});
            connection.execute(new String[]{"COMMIT"});
            start(OracleConnector.class, TestHelper.defaultConfig().with(OracleConnectorConfig.SNAPSHOT_MODE, OracleConnectorConfig.SnapshotMode.INITIAL).with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "debezium.orders").build());
            assertNoRecordsToConsume();
            waitForSnapshotToBeCompleted(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
            List recordsForTopic = consumeRecordsByTopic(1).recordsForTopic("server1.DEBEZIUM.ORDERS");
            Assertions.assertThat(recordsForTopic.size()).isEqualTo(1);
            Struct struct = ((Struct) ((SourceRecord) recordsForTopic.get(0)).value()).getStruct("after");
            Assertions.assertThat(struct.get("ID")).isEqualTo(9);
            Assertions.assertThat(struct.get("ORDER_DATE")).isEqualTo(1519257600000L);
            waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
            connection.execute(new String[]{"INSERT INTO debezium.orders VALUES (10, TO_DATE('2018-02-22', 'yyyy-mm-dd'))"});
            connection.execute(new String[]{"COMMIT"});
            List recordsForTopic2 = consumeRecordsByTopic(1).recordsForTopic("server1.DEBEZIUM.ORDERS");
            Assertions.assertThat(recordsForTopic2).hasSize(1);
            Struct struct2 = ((Struct) ((SourceRecord) recordsForTopic2.get(0)).value()).getStruct("after");
            Assertions.assertThat(struct2.get("ID")).isEqualTo(10);
            Assertions.assertThat(struct2.get("ORDER_DATE")).isEqualTo(1519257600000L);
            TestHelper.dropTable(connection, "orders");
        } catch (Throwable th) {
            TestHelper.dropTable(connection, "orders");
            throw th;
        }
    }

    @Test
    @FixFor({"DBZ-2733"})
    public void shouldConvertNumericAsStringDecimalHandlingMode() throws Exception {
        TestHelper.dropTable(connection, "table_number_pk");
        try {
            connection.execute(new String[]{"CREATE TABLE table_number_pk (id NUMBER, name varchar2(255), age number, primary key (id))"});
            connection.execute(new String[]{"GRANT SELECT ON debezium.table_number_pk TO " + TestHelper.getConnectorUserName()});
            connection.execute(new String[]{"ALTER TABLE debezium.table_number_pk ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS"});
            connection.execute(new String[]{"INSERT INTO debezium.table_number_pk (id, name, age) values (1, 'Bob', 25)"});
            connection.execute(new String[]{"COMMIT"});
            start(OracleConnector.class, TestHelper.defaultConfig().with(OracleConnectorConfig.SNAPSHOT_MODE, OracleConnectorConfig.SnapshotMode.INITIAL).with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "debezium.table_number_pk").with(OracleConnectorConfig.DECIMAL_HANDLING_MODE, "string").build());
            assertNoRecordsToConsume();
            waitForSnapshotToBeCompleted(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
            AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(1);
            Assertions.assertThat(consumeRecordsByTopic.recordsForTopic("server1.DEBEZIUM.TABLE_NUMBER_PK")).hasSize(1);
            assertRecordSchemaAndValues(Arrays.asList(new SchemaAndValueField("ID", Schema.STRING_SCHEMA, "1"), new SchemaAndValueField("NAME", Schema.OPTIONAL_STRING_SCHEMA, "Bob"), new SchemaAndValueField("AGE", Schema.OPTIONAL_STRING_SCHEMA, "25")), (SourceRecord) consumeRecordsByTopic.recordsForTopic("server1.DEBEZIUM.TABLE_NUMBER_PK").get(0), "after");
            waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
            connection.execute(new String[]{"INSERT INTO debezium.table_number_pk (id, name, age) values (2, 'Sue', 30)"});
            connection.execute(new String[]{"COMMIT"});
            AbstractConnectorTest.SourceRecords consumeRecordsByTopic2 = consumeRecordsByTopic(1);
            Assertions.assertThat(consumeRecordsByTopic2.recordsForTopic("server1.DEBEZIUM.TABLE_NUMBER_PK")).hasSize(1);
            assertRecordSchemaAndValues(Arrays.asList(new SchemaAndValueField("ID", Schema.STRING_SCHEMA, "2"), new SchemaAndValueField("NAME", Schema.OPTIONAL_STRING_SCHEMA, "Sue"), new SchemaAndValueField("AGE", Schema.OPTIONAL_STRING_SCHEMA, "30")), (SourceRecord) consumeRecordsByTopic2.recordsForTopic("server1.DEBEZIUM.TABLE_NUMBER_PK").get(0), "after");
            TestHelper.dropTable(connection, "table_number_pk");
        } catch (Throwable th) {
            TestHelper.dropTable(connection, "table_number_pk");
            throw th;
        }
    }

    protected void assertRecordSchemaAndValues(List<SchemaAndValueField> list, SourceRecord sourceRecord, String str) {
        Struct struct = ((Struct) sourceRecord.value()).getStruct(str);
        if (list == null) {
            Assertions.assertThat(struct).isNull();
        } else {
            ((ObjectAssert) Assertions.assertThat(struct).as("expected there to be content in Envelope under " + str)).isNotNull();
            list.forEach(schemaAndValueField -> {
                schemaAndValueField.assertFor(struct);
            });
        }
    }

    @Test
    @FixFor({"DBZ-2920"})
    public void shouldStreamDdlThatExceeds4000() throws Exception {
        TestHelper.dropTable(connection, "large_dml");
        connection.execute(new String[]{"CREATE TABLE large_dml (id NUMERIC(6), value varchar2(4000), value2 varchar2(4000), primary key(id))"});
        connection.execute(new String[]{"GRANT SELECT ON debezium.large_dml TO " + TestHelper.getConnectorUserName()});
        connection.execute(new String[]{"ALTER TABLE debezium.large_dml ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS"});
        String generateAlphaNumericStringColumn = generateAlphaNumericStringColumn(4000);
        String generateAlphaNumericStringColumn2 = generateAlphaNumericStringColumn(4000);
        connection.execute(new String[]{"INSERT INTO large_dml (id, value, value2) values (1, '" + generateAlphaNumericStringColumn + "', '" + generateAlphaNumericStringColumn2 + "')"});
        connection.commit();
        start(OracleConnector.class, TestHelper.defaultConfig().with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "debezium.large_dml").with(OracleConnectorConfig.SNAPSHOT_MODE, OracleConnectorConfig.SnapshotMode.INITIAL).build());
        assertNoRecordsToConsume();
        waitForSnapshotToBeCompleted(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(1);
        Assertions.assertThat(consumeRecordsByTopic.topics()).hasSize(1);
        Assertions.assertThat(consumeRecordsByTopic.recordsForTopic("server1.DEBEZIUM.LARGE_DML")).hasSize(1);
        Struct struct = ((Struct) ((SourceRecord) consumeRecordsByTopic.recordsForTopic("server1.DEBEZIUM.LARGE_DML").get(0)).value()).getStruct("after");
        Assertions.assertThat(struct.get("ID")).isEqualTo(1);
        Assertions.assertThat(struct.get("VALUE")).isEqualTo(generateAlphaNumericStringColumn);
        Assertions.assertThat(struct.get("VALUE2")).isEqualTo(generateAlphaNumericStringColumn2);
        waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        for (int i = 0; i < 10; i++) {
            arrayList.add(generateAlphaNumericStringColumn(4000));
            arrayList2.add(generateAlphaNumericStringColumn(4000));
            connection.execute(new String[]{"INSERT INTO large_dml (id, value, value2) values (" + (2 + i) + ", '" + ((String) arrayList.get(arrayList.size() - 1)) + "', '" + ((String) arrayList2.get(arrayList2.size() - 1)) + "')"});
        }
        connection.commit();
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic2 = consumeRecordsByTopic(10);
        Assertions.assertThat(consumeRecordsByTopic2.topics()).hasSize(1);
        Assertions.assertThat(consumeRecordsByTopic2.recordsForTopic("server1.DEBEZIUM.LARGE_DML")).hasSize(10);
        List recordsForTopic = consumeRecordsByTopic2.recordsForTopic("server1.DEBEZIUM.LARGE_DML");
        for (int i2 = 0; i2 < 10; i2++) {
            Struct struct2 = ((Struct) ((SourceRecord) recordsForTopic.get(i2)).value()).getStruct("after");
            Assertions.assertThat(struct2.get("ID")).isEqualTo(Integer.valueOf(2 + i2));
            Assertions.assertThat(struct2.get("VALUE")).isEqualTo(arrayList.get(i2));
            Assertions.assertThat(struct2.get("VALUE2")).isEqualTo(arrayList2.get(i2));
        }
        stopConnector(z -> {
            TestHelper.dropTable(connection, "large_dml");
        });
    }

    @Test
    @FixFor({"DBZ-2891"})
    @SkipWhenAdapterNameIsNot(value = SkipWhenAdapterNameIsNot.AdapterName.XSTREAM, reason = "Only applies to Xstreams")
    public void shouldNotObserveDeadlockWhileStreamingWithXstream() throws Exception {
        long j = this.pollTimeoutInMs;
        TestHelper.dropTable(connection, "deadlock_test");
        try {
            connection.execute(new String[]{"CREATE TABLE deadlock_test (id numeric(9), name varchar2(50), primary key(id))"});
            connection.execute(new String[]{"GRANT SELECT ON debezium.deadlock_test TO " + TestHelper.getConnectorUserName()});
            connection.execute(new String[]{"ALTER TABLE debezium.deadlock_test ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS"});
            this.pollTimeoutInMs = TimeUnit.SECONDS.toMillis(20L);
            start(OracleConnector.class, TestHelper.defaultConfig().with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "debezium.deadlock_test").with(OracleConnectorConfig.SNAPSHOT_MODE, OracleConnectorConfig.SnapshotMode.INITIAL).with(OracleConnectorConfig.MAX_QUEUE_SIZE, 2).with(RelationalDatabaseConnectorConfig.MAX_BATCH_SIZE, 1).build());
            assertNoRecordsToConsume();
            waitForSnapshotToBeCompleted(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
            for (int i = 0; i < 10; i++) {
                connection.execute(new String[]{"INSERT INTO deadlock_test (id, name) values (" + i + ", 'Test " + i + "')"});
                connection.execute(new String[]{"COMMIT"});
            }
            AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(10, 24);
            Assertions.assertThat(consumeRecordsByTopic.topics()).hasSize(1);
            Assertions.assertThat(consumeRecordsByTopic.recordsForTopic("server1.DEBEZIUM.DEADLOCK_TEST")).hasSize(10);
            this.pollTimeoutInMs = j;
            TestHelper.dropTable(connection, "deadlock_test");
        } catch (Throwable th) {
            this.pollTimeoutInMs = j;
            TestHelper.dropTable(connection, "deadlock_test");
            throw th;
        }
    }

    @Test
    @FixFor({"DBZ-3057"})
    public void shouldReadTableUniqueIndicesWithCharactersThatRequireExplicitQuotes() throws Exception {
        try {
            TestHelper.dropTable(connection, "debezium.\"#T70_Sid:582003931_1_ConnConne\"");
            connection.execute(new String[]{"CREATE GLOBAL TEMPORARY TABLE debezium.\"#T70_Sid:582003931_1_ConnConne\" (id number, name varchar2(50))"});
            connection.execute(new String[]{"GRANT SELECT ON debezium.\"#T70_Sid:582003931_1_ConnConne\" TO " + TestHelper.getConnectorUserName()});
            connection.execute(new String[]{"ALTER TABLE debezium.\"#T70_Sid:582003931_1_ConnConne\" ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS"});
            start(OracleConnector.class, TestHelper.defaultConfig().with(OracleConnectorConfig.SNAPSHOT_MODE, OracleConnectorConfig.SnapshotMode.INITIAL).with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.\\#T70_Sid\\:582003931_1_ConnConne").build());
            assertConnectorIsRunning();
            waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
            TestHelper.dropTable(connection, "debezium.\"#T70_Sid:582003931_1_ConnConne\"");
        } catch (Throwable th) {
            TestHelper.dropTable(connection, "debezium.\"#T70_Sid:582003931_1_ConnConne\"");
            throw th;
        }
    }

    @Test
    @FixFor({"DBZ-3151"})
    public void testSnapshotCompletesWithSystemGeneratedUniqueIndexOnKeylessTable() throws Exception {
        TestHelper.dropTable(connection, "XML_TABLE");
        try {
            connection.execute(new String[]{"CREATE TABLE XML_TABLE of XMLTYPE"});
            connection.execute(new String[]{"GRANT SELECT ON DEBEZIUM.XML_TABLE TO " + TestHelper.getConnectorUserName()});
            connection.execute(new String[]{"ALTER TABLE DEBEZIUM.XML_TABLE ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS"});
            connection.execute(new String[]{"INSERT INTO DEBEZIUM.XML_TABLE values (xmltype('<?xml version=\"1.0\"?><tab><name>Hi</name></tab>'))"});
            connection.execute(new String[]{"COMMIT"});
            start(OracleConnector.class, TestHelper.defaultConfig().with(OracleConnectorConfig.SNAPSHOT_MODE, OracleConnectorConfig.SnapshotMode.INITIAL).with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.XML_TABLE").build());
            assertNoRecordsToConsume();
            waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
            TestHelper.dropTable(connection, "XML_TABLE");
        } catch (Throwable th) {
            TestHelper.dropTable(connection, "XML_TABLE");
            throw th;
        }
    }

    @Test
    @FixFor({"DBZ-3001"})
    public void shouldGetOracleDatabaseVersion() throws Exception {
        OracleDatabaseVersion oracleVersion = connection.getOracleVersion();
        Assertions.assertThat(oracleVersion).isNotNull();
        Assertions.assertThat(oracleVersion.getMajor()).isGreaterThan(0);
    }

    @Test
    @FixFor({"DBZ-3109"})
    public void shouldStreamChangesForTableWithMultipleLogGroupTypes() throws Exception {
        try {
            TestHelper.dropTable(connection, "log_group_test");
            connection.execute(new String[]{"CREATE TABLE log_group_test (id numeric(9,0) primary key, name varchar2(50))"});
            connection.execute(new String[]{"GRANT SELECT ON debezium.log_group_test TO " + TestHelper.getConnectorUserName()});
            connection.execute(new String[]{"ALTER TABLE debezium.log_group_test ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS"});
            connection.execute(new String[]{"ALTER TABLE debezium.log_group_test ADD SUPPLEMENTAL LOG DATA (PRIMARY KEY) COLUMNS"});
            start(OracleConnector.class, TestHelper.defaultConfig().with(OracleConnectorConfig.SNAPSHOT_MODE, OracleConnectorConfig.SnapshotMode.INITIAL).with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.LOG_GROUP_TEST").build());
            assertConnectorIsRunning();
            waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
            connection.execute(new String[]{"INSERT INTO debezium.log_group_test (id, name) values (1,'Test')"});
            connection.execute(new String[]{"COMMIT"});
            Assertions.assertThat(consumeRecordsByTopic(1).recordsForTopic("server1.DEBEZIUM.LOG_GROUP_TEST")).hasSize(1);
            TestHelper.dropTable(connection, "log_group_test");
        } catch (Throwable th) {
            TestHelper.dropTable(connection, "log_group_test");
            throw th;
        }
    }

    @Test
    @FixFor({"DBZ-2875"})
    public void shouldResumeStreamingAtCorrectScnOffset() throws Exception {
        TestHelper.dropTable(connection, "offset_test");
        try {
            Testing.Debug.enable();
            connection.execute(new String[]{"CREATE TABLE offset_test (id numeric(9,0) primary key, name varchar2(50))"});
            connection.execute(new String[]{"GRANT SELECT ON debezium.offset_test TO " + TestHelper.getConnectorUserName()});
            connection.execute(new String[]{"ALTER TABLE debezium.offset_test ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS"});
            Configuration build = TestHelper.defaultConfig().with(OracleConnectorConfig.SNAPSHOT_MODE, OracleConnectorConfig.SnapshotMode.SCHEMA_ONLY).with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.OFFSET_TEST").build();
            start(OracleConnector.class, build);
            assertNoRecordsToConsume();
            waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
            connection.execute(new String[]{"INSERT INTO debezium.offset_test (id, name) values (1, 'Bob')"});
            AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(1);
            Assertions.assertThat(consumeRecordsByTopic.recordsForTopic("server1.DEBEZIUM.OFFSET_TEST")).hasSize(1);
            Struct struct = (Struct) ((Struct) ((SourceRecord) consumeRecordsByTopic.allRecordsInOrder().get(0)).value()).get("after");
            Testing.print(struct);
            Assertions.assertThat(struct.get("ID")).isEqualTo(1);
            Assertions.assertThat(struct.get("NAME")).isEqualTo("Bob");
            stopConnector();
            start(OracleConnector.class, build);
            assertNoRecordsToConsume();
            waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
            connection.execute(new String[]{"INSERT INTO debezium.offset_test (id, name) values (2, 'Bill')"});
            AbstractConnectorTest.SourceRecords consumeRecordsByTopic2 = consumeRecordsByTopic(1);
            Assertions.assertThat(consumeRecordsByTopic2.recordsForTopic("server1.DEBEZIUM.OFFSET_TEST")).hasSize(1);
            Struct struct2 = (Struct) ((Struct) ((SourceRecord) consumeRecordsByTopic2.allRecordsInOrder().get(0)).value()).get("after");
            Testing.print(struct2);
            Assertions.assertThat(struct2.get("ID")).isEqualTo(2);
            Assertions.assertThat(struct2.get("NAME")).isEqualTo("Bill");
            TestHelper.dropTable(connection, "offset_test");
        } catch (Throwable th) {
            TestHelper.dropTable(connection, "offset_test");
            throw th;
        }
    }

    @Test
    @FixFor({"DBZ-3036"})
    public void shouldHandleParentChildIndexOrganizedTables() throws Exception {
        TestHelper.dropTable(connection, "test_iot");
        try {
            connection.execute(new String[]{"CREATE TABLE test_iot (id numeric(9,0), description varchar2(50) not null, primary key(id)) ORGANIZATION INDEX INCLUDING description OVERFLOW"});
            TestHelper.streamTable(connection, "debezium.test_iot");
            connection.executeWithoutCommitting(new String[]{"INSERT INTO debezium.test_iot VALUES ('1', 'Hello World')"});
            connection.execute(new String[]{"COMMIT"});
            start(OracleConnector.class, TestHelper.defaultConfig().with(OracleConnectorConfig.SCHEMA_INCLUDE_LIST, "DEBEZIUM").with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "(.)*IOT(.)*").build());
            assertNoRecordsToConsume();
            waitForSnapshotToBeCompleted(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
            AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(1);
            Assertions.assertThat(consumeRecordsByTopic.recordsForTopic("server1.DEBEZIUM.TEST_IOT")).hasSize(1);
            SourceRecord sourceRecord = (SourceRecord) consumeRecordsByTopic.recordsForTopic("server1.DEBEZIUM.TEST_IOT").get(0);
            Struct struct = (Struct) ((Struct) sourceRecord.value()).get("after");
            VerifyRecord.isValidRead(sourceRecord, "ID", 1);
            Assertions.assertThat(struct.get("DESCRIPTION")).isEqualTo("Hello World");
            waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
            connection.executeWithoutCommitting(new String[]{"INSERT INTO debezium.test_iot VALUES ('2', 'Goodbye')"});
            connection.execute(new String[]{"COMMIT"});
            AbstractConnectorTest.SourceRecords consumeRecordsByTopic2 = consumeRecordsByTopic(1);
            Assertions.assertThat(consumeRecordsByTopic2.recordsForTopic("server1.DEBEZIUM.TEST_IOT")).hasSize(1);
            SourceRecord sourceRecord2 = (SourceRecord) consumeRecordsByTopic2.recordsForTopic("server1.DEBEZIUM.TEST_IOT").get(0);
            Struct struct2 = (Struct) ((Struct) sourceRecord2.value()).get("after");
            VerifyRecord.isValidInsert(sourceRecord2, "ID", 2);
            Assertions.assertThat(struct2.get("DESCRIPTION")).isEqualTo("Goodbye");
            TestHelper.dropTable(connection, "test_iot");
            TestHelper.purgeRecycleBin(connection);
        } catch (Throwable th) {
            TestHelper.dropTable(connection, "test_iot");
            TestHelper.purgeRecycleBin(connection);
            throw th;
        }
    }

    @Test
    @FixFor({"DBZ-3257"})
    public void shouldSnapshotAndStreamClobDataTypes() throws Exception {
        TestHelper.dropTable(connection, "clob_test");
        try {
            connection.execute(new String[]{"CREATE TABLE clob_test(id numeric(9,0) primary key, val_clob clob, val_nclob nclob)"});
            TestHelper.streamTable(connection, "clob_test");
            connection.execute(new String[]{"INSERT INTO clob_test values (1, 'TestClob', 'TestNClob')"});
            start(OracleConnector.class, TestHelper.defaultConfig().with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.CLOB_TEST").with(OracleConnectorConfig.LOB_ENABLED, true).build());
            assertConnectorIsRunning();
            waitForSnapshotToBeCompleted(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
            AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(1);
            Assertions.assertThat(consumeRecordsByTopic.recordsForTopic("server1.DEBEZIUM.CLOB_TEST")).hasSize(1);
            List recordsForTopic = consumeRecordsByTopic.recordsForTopic("server1.DEBEZIUM.CLOB_TEST");
            VerifyRecord.isValidRead((SourceRecord) recordsForTopic.get(0), "ID", 1);
            Struct struct = (Struct) ((Struct) ((SourceRecord) recordsForTopic.get(0)).value()).get("after");
            Assertions.assertThat(struct.get("VAL_CLOB")).isEqualTo("TestClob");
            Assertions.assertThat(struct.get("VAL_NCLOB")).isEqualTo("TestNClob");
            waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
            connection.execute(new String[]{"UPDATE clob_test SET val_clob = 'TestClob2', val_nclob = 'TestNClob2' WHERE ID = 1"});
            AbstractConnectorTest.SourceRecords consumeRecordsByTopic2 = consumeRecordsByTopic(1);
            Assertions.assertThat(consumeRecordsByTopic2.recordsForTopic("server1.DEBEZIUM.CLOB_TEST")).hasSize(1);
            List recordsForTopic2 = consumeRecordsByTopic2.recordsForTopic("server1.DEBEZIUM.CLOB_TEST");
            VerifyRecord.isValidUpdate((SourceRecord) recordsForTopic2.get(0), "ID", 1);
            Struct struct2 = (Struct) ((Struct) ((SourceRecord) recordsForTopic2.get(0)).value()).get("after");
            Assertions.assertThat(struct2.get("VAL_CLOB")).isEqualTo("TestClob2");
            Assertions.assertThat(struct2.get("VAL_NCLOB")).isEqualTo("TestNClob2");
            TestHelper.dropTable(connection, "clob_test");
        } catch (Throwable th) {
            TestHelper.dropTable(connection, "clob_test");
            throw th;
        }
    }

    @Test
    @FixFor({"DBZ-3347"})
    public void shouldContainPartitionInSchemaChangeEvent() throws Exception {
        TestHelper.dropTable(connection, "dbz3347");
        try {
            connection.execute(new String[]{"create table dbz3347 (id number primary key, data varchar2(50))"});
            TestHelper.streamTable(connection, "dbz3347");
            start(OracleConnector.class, TestHelper.defaultConfig().with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.DBZ3347").with(OracleConnectorConfig.LOG_MINING_STRATEGY, "online_catalog").with(OracleConnectorConfig.INCLUDE_SCHEMA_CHANGES, true).build());
            assertConnectorIsRunning();
            waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
            Assertions.assertThat(((SourceRecord) consumeRecordsByTopic(1).recordsForTopic(TestHelper.SERVER_NAME).get(0)).sourcePartition()).isEqualTo(Collections.singletonMap("server", TestHelper.SERVER_NAME));
            TestHelper.dropTable(connection, "dbz3347");
        } catch (Throwable th) {
            TestHelper.dropTable(connection, "dbz3347");
            throw th;
        }
    }

    @Test
    @FixFor({"DBZ-832"})
    public void shouldSnapshotAndStreamTablesWithNoPrimaryKey() throws Exception {
        TestHelper.dropTable(connection, "dbz832");
        try {
            connection.execute(new String[]{"create table dbz832 (id numeric(9,0), data varchar2(50))"});
            TestHelper.streamTable(connection, "dbz832");
            connection.execute(new String[]{"INSERT INTO dbz832 values (1, 'Test')"});
            start(OracleConnector.class, TestHelper.defaultConfig().with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.DBZ832").build());
            assertConnectorIsRunning();
            waitForSnapshotToBeCompleted(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
            AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(1);
            Assertions.assertThat(consumeRecordsByTopic.recordsForTopic("server1.DEBEZIUM.DBZ832")).hasSize(1);
            SourceRecord sourceRecord = (SourceRecord) consumeRecordsByTopic.recordsForTopic("server1.DEBEZIUM.DBZ832").get(0);
            Assertions.assertThat(sourceRecord.key()).isNull();
            Struct struct = ((Struct) sourceRecord.value()).getStruct("after");
            Assertions.assertThat(struct.get("ID")).isEqualTo(1);
            Assertions.assertThat(struct.get("DATA")).isEqualTo("Test");
            waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
            connection.execute(new String[]{"INSERT INTO dbz832 values (2, 'Test2')"});
            AbstractConnectorTest.SourceRecords consumeRecordsByTopic2 = consumeRecordsByTopic(1);
            Assertions.assertThat(consumeRecordsByTopic2.recordsForTopic("server1.DEBEZIUM.DBZ832")).hasSize(1);
            SourceRecord sourceRecord2 = (SourceRecord) consumeRecordsByTopic2.recordsForTopic("server1.DEBEZIUM.DBZ832").get(0);
            Assertions.assertThat(sourceRecord2.key()).isNull();
            Struct struct2 = ((Struct) sourceRecord2.value()).getStruct("after");
            Assertions.assertThat(struct2.get("ID")).isEqualTo(2);
            Assertions.assertThat(struct2.get("DATA")).isEqualTo("Test2");
            TestHelper.dropTable(connection, "dbz832");
        } catch (Throwable th) {
            TestHelper.dropTable(connection, "dbz832");
            throw th;
        }
    }

    @Test
    @FixFor({"DBZ-3322"})
    public void shouldNotEmitEventsOnConstraintViolations() throws Exception {
        TestHelper.dropTable(connection, "dbz3322");
        try {
            connection.execute(new String[]{"CREATE TABLE dbz3322 (id number(9,0), data varchar2(50))"});
            connection.execute(new String[]{"CREATE UNIQUE INDEX uk_dbz3322 ON dbz3322 (id)"});
            TestHelper.streamTable(connection, "dbz3322");
            start(OracleConnector.class, TestHelper.defaultConfig().with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.DBZ3322").build());
            assertConnectorIsRunning();
            waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
            try {
                try {
                    connection.executeWithoutCommitting(new String[]{"INSERT INTO dbz3322 (id,data) values (1, 'Test1')"});
                    connection.executeWithoutCommitting(new String[]{"INSERT INTO dbz3322 (id,data) values (1, 'Test2')"});
                    connection.executeWithoutCommitting(new String[]{"COMMIT"});
                } catch (Throwable th) {
                    connection.executeWithoutCommitting(new String[]{"COMMIT"});
                    throw th;
                }
            } catch (SQLException e) {
                if (!e.getMessage().startsWith("ORA-00001")) {
                    throw e;
                }
                connection.executeWithoutCommitting(new String[]{"COMMIT"});
            }
            AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(1);
            Assertions.assertThat(consumeRecordsByTopic.recordsForTopic("server1.DEBEZIUM.DBZ3322")).hasSize(1);
            Struct struct = ((Struct) ((SourceRecord) consumeRecordsByTopic.recordsForTopic("server1.DEBEZIUM.DBZ3322").get(0)).value()).getStruct("after");
            Assertions.assertThat(struct.get("ID")).isEqualTo(1);
            Assertions.assertThat(struct.get("DATA")).isEqualTo("Test1");
            assertNoRecordsToConsume();
            TestHelper.dropTable(connection, "dbz3322");
        } catch (Throwable th2) {
            TestHelper.dropTable(connection, "dbz3322");
            throw th2;
        }
    }

    @Test
    @FixFor({"DBZ-3322"})
    public void shouldNotEmitEventsInRollbackTransaction() throws Exception {
        TestHelper.dropTable(connection, "dbz3322");
        try {
            connection.execute(new String[]{"CREATE TABLE dbz3322 (id number(9,0), data varchar2(50))"});
            connection.execute(new String[]{"CREATE UNIQUE INDEX uk_dbz3322 ON dbz3322 (id)"});
            TestHelper.streamTable(connection, "dbz3322");
            start(OracleConnector.class, TestHelper.defaultConfig().with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.DBZ3322").build());
            assertConnectorIsRunning();
            waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
            connection.executeWithoutCommitting(new String[]{"INSERT INTO dbz3322 (id,data) values (1, 'Test')"});
            connection.executeWithoutCommitting(new String[]{"INSERT INTO dbz3322 (id,data) values (2, 'Test')"});
            connection.executeWithoutCommitting(new String[]{"ROLLBACK"});
            connection.executeWithoutCommitting(new String[]{"INSERT INTO dbz3322 (id,data) values (3, 'Test')"});
            connection.executeWithoutCommitting(new String[]{"COMMIT"});
            AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(1);
            Assertions.assertThat(consumeRecordsByTopic.recordsForTopic("server1.DEBEZIUM.DBZ3322")).hasSize(1);
            Assertions.assertThat(((Struct) ((SourceRecord) consumeRecordsByTopic.recordsForTopic("server1.DEBEZIUM.DBZ3322").get(0)).value()).getStruct("after").get("ID")).isEqualTo(3);
            assertNoRecordsToConsume();
            TestHelper.dropTable(connection, "dbz3322");
        } catch (Throwable th) {
            TestHelper.dropTable(connection, "dbz3322");
            throw th;
        }
    }

    @Test
    @FixFor({"DBZ-3062"})
    public void shouldSelectivelySnapshotTables() throws Exception {
        TestHelper.dropTables(connection, "dbz3062a", "dbz3062b");
        try {
            connection.execute(new String[]{"CREATE TABLE dbz3062a (id number(9,0), data varchar2(50))"});
            connection.execute(new String[]{"CREATE TABLE dbz3062b (id number(9,0), data varchar2(50))"});
            TestHelper.streamTable(connection, "dbz3062a");
            TestHelper.streamTable(connection, "dbz3062b");
            connection.execute(new String[]{"INSERT INTO dbz3062a VALUES (1, 'Test1')"});
            connection.execute(new String[]{"INSERT INTO dbz3062b VALUES (2, 'Test2')"});
            start(OracleConnector.class, TestHelper.defaultConfig().with(OracleConnectorConfig.SNAPSHOT_MODE, OracleConnectorConfig.SnapshotMode.INITIAL).with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.DBZ3062.*").with(OracleConnectorConfig.SNAPSHOT_MODE_TABLES, "[A-z].*DEBEZIUM\\.DBZ3062A").build());
            assertConnectorIsRunning();
            waitForSnapshotToBeCompleted(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
            AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(1);
            List recordsForTopic = consumeRecordsByTopic.recordsForTopic("server1.DEBEZIUM.DBZ3062A");
            List recordsForTopic2 = consumeRecordsByTopic.recordsForTopic("server1.DEBEZIUM.DBZ3062B");
            Assertions.assertThat(recordsForTopic).hasSize(1);
            Assertions.assertThat(recordsForTopic2).isNull();
            Struct struct = ((Struct) ((SourceRecord) recordsForTopic.get(0)).value()).getStruct("after");
            Assertions.assertThat(struct.get("ID")).isEqualTo(1);
            Assertions.assertThat(struct.get("DATA")).isEqualTo("Test1");
            assertNoRecordsToConsume();
            waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
            connection.executeWithoutCommitting(new String[]{"INSERT INTO dbz3062a VALUES (3, 'Test3')"});
            connection.executeWithoutCommitting(new String[]{"INSERT INTO dbz3062b VALUES (4, 'Test4')"});
            connection.commit();
            AbstractConnectorTest.SourceRecords consumeRecordsByTopic2 = consumeRecordsByTopic(2);
            List recordsForTopic3 = consumeRecordsByTopic2.recordsForTopic("server1.DEBEZIUM.DBZ3062A");
            List recordsForTopic4 = consumeRecordsByTopic2.recordsForTopic("server1.DEBEZIUM.DBZ3062B");
            Assertions.assertThat(recordsForTopic3).hasSize(1);
            Assertions.assertThat(recordsForTopic4).hasSize(1);
            Struct struct2 = ((Struct) ((SourceRecord) recordsForTopic3.get(0)).value()).getStruct("after");
            Assertions.assertThat(struct2.get("ID")).isEqualTo(3);
            Assertions.assertThat(struct2.get("DATA")).isEqualTo("Test3");
            Struct struct3 = ((Struct) ((SourceRecord) recordsForTopic4.get(0)).value()).getStruct("after");
            Assertions.assertThat(struct3.get("ID")).isEqualTo(4);
            Assertions.assertThat(struct3.get("DATA")).isEqualTo("Test4");
            TestHelper.dropTables(connection, "dbz3062a", "dbz3062b");
        } catch (Throwable th) {
            TestHelper.dropTables(connection, "dbz3062a", "dbz3062b");
            throw th;
        }
    }

    @Test
    @FixFor({"DBZ-3616"})
    public void shouldNotLogWarningsAboutCommittedTransactionsWhileStreamingNormally() throws Exception {
        LogInterceptor logInterceptor = new LogInterceptor();
        TestHelper.dropTables(connection, "dbz3616", "dbz3616");
        try {
            connection.execute(new String[]{"CREATE TABLE dbz3616 (id number(9,0), data varchar2(50))"});
            TestHelper.streamTable(connection, "dbz3616");
            connection.commit();
            start(OracleConnector.class, TestHelper.defaultConfig().with(OracleConnectorConfig.SNAPSHOT_MODE, OracleConnectorConfig.SnapshotMode.INITIAL).with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.DBZ3616.*").with(OracleConnectorConfig.LOG_MINING_STRATEGY, "online_catalog").build());
            assertConnectorIsRunning();
            waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
            OracleConnection testConnection = TestHelper.testConnection();
            testConnection.executeWithoutCommitting(new String[]{"INSERT INTO dbz3616 (id,data) values (1,'Conn2')"});
            connection.executeWithoutCommitting(new String[]{"INSERT INTO dbz3616 (id,data) values (2,'Conn1')"});
            connection.commit();
            Awaitility.await().pollDelay(Durations.ONE_MINUTE).timeout(Durations.TWO_MINUTES).until(() -> {
                return true;
            });
            testConnection.commit();
            AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(2);
            Assertions.assertThat(consumeRecordsByTopic.recordsForTopic("server1.DEBEZIUM.DBZ3616")).hasSize(2);
            List recordsForTopic = consumeRecordsByTopic.recordsForTopic("server1.DEBEZIUM.DBZ3616");
            Assertions.assertThat(((Struct) ((SourceRecord) recordsForTopic.get(0)).value()).getStruct("after").get("ID")).isEqualTo(2);
            Assertions.assertThat(((Struct) ((SourceRecord) recordsForTopic.get(1)).value()).getStruct("after").get("ID")).isEqualTo(1);
            Assertions.assertThat(logInterceptor.containsWarnMessage("was already processed, ignore.")).isFalse();
            TestHelper.dropTables(connection, "dbz3616", "dbz3616");
        } catch (Throwable th) {
            TestHelper.dropTables(connection, "dbz3616", "dbz3616");
            throw th;
        }
    }

    private String generateAlphaNumericStringColumn(int i) {
        StringBuilder sb = new StringBuilder(i);
        for (int i2 = 0; i2 < i; i2++) {
            sb.append("ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789abcdefghijklmnopqrstuvwxyz".charAt((int) ("ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789abcdefghijklmnopqrstuvwxyz".length() * Math.random())));
        }
        return sb.toString();
    }

    private void verifyHeartbeatRecord(SourceRecord sourceRecord) {
        TestCase.assertEquals("__debezium-heartbeat.server1", sourceRecord.topic());
        Assertions.assertThat(((Struct) sourceRecord.key()).get("serverName")).isEqualTo(TestHelper.SERVER_NAME);
    }

    private long toMicroSecondsSinceEpoch(LocalDateTime localDateTime) {
        return localDateTime.toEpochSecond(ZoneOffset.UTC) * MICROS_PER_SECOND;
    }
}
