package io.debezium.connector.oracle;

import io.debezium.config.Configuration;
import io.debezium.connector.oracle.junit.SkipTestDependingOnStrategyRule;
import io.debezium.connector.oracle.junit.SkipWhenLogMiningStrategyIs;
import io.debezium.connector.oracle.util.TestHelper;
import io.debezium.data.VerifyRecord;
import io.debezium.doc.FixFor;
import io.debezium.embedded.AbstractConnectorTest;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.relational.history.SchemaHistory;
import io.debezium.util.Testing;
import java.io.StringReader;
import java.io.StringWriter;
import java.sql.Clob;
import java.sql.SQLException;
import java.util.List;
import java.util.concurrent.TimeUnit;
import javax.xml.transform.Transformer;
import javax.xml.transform.TransformerFactory;
import javax.xml.transform.stream.StreamResult;
import javax.xml.transform.stream.StreamSource;
import oracle.xdb.XMLType;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.assertj.core.api.Assertions;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestRule;

@SkipWhenLogMiningStrategyIs(value = SkipWhenLogMiningStrategyIs.Strategy.HYBRID, reason = "Hybrid does not support XML")
/* loaded from: input_file:io/debezium/connector/oracle/OracleXmlDataTypesIT.class */
public class OracleXmlDataTypesIT extends AbstractConnectorTest {
    private static final String XML_DATA = Testing.Files.readResourceAsString("data/test_xml_data_short.xml");
    private static final String XML_DATA2 = Testing.Files.readResourceAsString("data/test_xml_data_short2.xml");
    private static final String XML_LONG_DATA = Testing.Files.readResourceAsString("data/test_xml_data_long.xml");
    private static final String XML_LONG_DATA2 = Testing.Files.readResourceAsString("data/test_xml_data_long2.xml");

    @Rule
    public final TestRule skipStrategyRule = new SkipTestDependingOnStrategyRule();
    private OracleConnection connection;

    @Before
    public void before() {
        this.connection = TestHelper.testConnection();
        setConsumeTimeout(TestHelper.defaultMessageConsumerPollTimeout(), TimeUnit.SECONDS);
        initializeConnectorTestFramework();
        Testing.Files.delete(TestHelper.SCHEMA_HISTORY_PATH);
    }

    @After
    public void after() throws Exception {
        if (this.connection != null) {
            this.connection.close();
        }
    }

    @Test
    @FixFor({"DBZ-3605"})
    public void shouldSnapshotTableWithXmlTypeColumnWithSimpleXmlData() throws Exception {
        TestHelper.dropTable(this.connection, "dbz3605");
        try {
            this.connection.execute(new String[]{"CREATE TABLE DBZ3605 (ID numeric(9,0), DATA xmltype, primary key(ID))"});
            TestHelper.streamTable(this.connection, "dbz3605");
            this.connection.execute(new String[]{"insert into dbz3605 values (1, xmltype('<?xml version=\"1.0\"?><warehouse></warehouse>'))"});
            start(OracleConnector.class, getDefaultXmlConfig().with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.DBZ3605").build());
            assertConnectorIsRunning();
            waitForSnapshotToBeCompleted(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
            List recordsForTopic = consumeRecordsByTopic(1).recordsForTopic(topicName("DBZ3605"));
            Assertions.assertThat(recordsForTopic).hasSize(1);
            SourceRecord sourceRecord = (SourceRecord) recordsForTopic.get(0);
            VerifyRecord.isValidRead(sourceRecord, "ID", 1);
            Struct after = after(sourceRecord);
            Assertions.assertThat(after.get("ID")).isEqualTo(1);
            assertXmlFieldIsEqual(after, "DATA", "<?xml version=\"1.0\"?><warehouse></warehouse>");
            TestHelper.dropTable(this.connection, "dbz3605");
        } catch (Throwable th) {
            TestHelper.dropTable(this.connection, "dbz3605");
            throw th;
        }
    }

    @Test
    @FixFor({"DBZ-3605"})
    public void shouldSnapshotTableWithXmlTypeColumnWithShortXmlData() throws Exception {
        TestHelper.dropTable(this.connection, "dbz3605");
        try {
            this.connection.execute(new String[]{"CREATE TABLE DBZ3605 (ID numeric(9,0), DATA xmltype, primary key(ID))"});
            TestHelper.streamTable(this.connection, "dbz3605");
            String str = XML_DATA;
            this.connection.prepareQuery("insert into dbz3605 values (1,xmltype(?))", preparedStatement -> {
                preparedStatement.setObject(1, str);
            }, (JdbcConnection.ResultSetConsumer) null);
            this.connection.commit();
            start(OracleConnector.class, getDefaultXmlConfig().with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.DBZ3605").build());
            assertConnectorIsRunning();
            waitForSnapshotToBeCompleted(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
            List recordsForTopic = consumeRecordsByTopic(1).recordsForTopic(topicName("DBZ3605"));
            Assertions.assertThat(recordsForTopic).hasSize(1);
            SourceRecord sourceRecord = (SourceRecord) recordsForTopic.get(0);
            VerifyRecord.isValidRead(sourceRecord, "ID", 1);
            Struct after = after(sourceRecord);
            Assertions.assertThat(after.get("ID")).isEqualTo(1);
            assertXmlFieldIsEqual(after, "DATA", str);
            TestHelper.dropTable(this.connection, "dbz3605");
        } catch (Throwable th) {
            TestHelper.dropTable(this.connection, "dbz3605");
            throw th;
        }
    }

    @Test
    @FixFor({"DBZ-3605"})
    public void shouldSnapshotTableWithXmlTypeColumnWithLongXmlData() throws Exception {
        TestHelper.dropTable(this.connection, "dbz3605");
        try {
            this.connection.execute(new String[]{"CREATE TABLE DBZ3605 (ID numeric(9,0), DATA xmltype, primary key(ID))"});
            TestHelper.streamTable(this.connection, "dbz3605");
            String str = XML_LONG_DATA;
            this.connection.prepareQuery("insert into dbz3605 values (1,?)", preparedStatement -> {
                preparedStatement.setObject(1, toXmlType(str));
            }, (JdbcConnection.ResultSetConsumer) null);
            this.connection.commit();
            start(OracleConnector.class, getDefaultXmlConfig().with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.DBZ3605").build());
            assertConnectorIsRunning();
            waitForSnapshotToBeCompleted(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
            List recordsForTopic = consumeRecordsByTopic(1).recordsForTopic(topicName("DBZ3605"));
            Assertions.assertThat(recordsForTopic).hasSize(1);
            SourceRecord sourceRecord = (SourceRecord) recordsForTopic.get(0);
            VerifyRecord.isValidRead(sourceRecord, "ID", 1);
            Struct after = after(sourceRecord);
            Assertions.assertThat(after.get("ID")).isEqualTo(1);
            assertXmlFieldIsEqual(after, "DATA", str);
            TestHelper.dropTable(this.connection, "dbz3605");
        } catch (Throwable th) {
            TestHelper.dropTable(this.connection, "dbz3605");
            throw th;
        }
    }

    @Test
    @FixFor({"DBZ-3605"})
    public void shouldStreamTableWithXmlTypeColumnWithSimpleXmlData() throws Exception {
        TestHelper.dropTable(this.connection, "dbz3605");
        try {
            this.connection.execute(new String[]{"CREATE TABLE DBZ3605 (ID numeric(9,0), DATA xmltype, primary key(ID))"});
            TestHelper.streamTable(this.connection, "dbz3605");
            Configuration build = getDefaultXmlConfig().with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.DBZ3605").build();
            start(OracleConnector.class, build);
            assertConnectorIsRunning();
            waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
            this.connection.execute(new String[]{"insert into dbz3605 values (1, xmltype('<?xml version=\"1.0\"?><warehouse></warehouse>'))"});
            List recordsForTopic = consumeRecordsByTopic(1).recordsForTopic(topicName("DBZ3605"));
            Assertions.assertThat(recordsForTopic).hasSize(1);
            SourceRecord sourceRecord = (SourceRecord) recordsForTopic.get(0);
            VerifyRecord.isValidInsert(sourceRecord, "ID", 1);
            Struct after = after(sourceRecord);
            Assertions.assertThat(after.get("ID")).isEqualTo(1);
            assertXmlFieldIsEqual(after, "DATA", "<?xml version=\"1.0\"?><warehouse></warehouse>");
            this.connection.execute(new String[]{"UPDATE dbz3605 SET data = xmltype('<?xml version=\"1.0\"?><warehouse><dept>25</dept></warehouse>') WHERE id = 1"});
            List recordsForTopic2 = consumeRecordsByTopic(1).recordsForTopic(topicName("DBZ3605"));
            Assertions.assertThat(recordsForTopic2).hasSize(1);
            SourceRecord sourceRecord2 = (SourceRecord) recordsForTopic2.get(0);
            VerifyRecord.isValidUpdate(sourceRecord2, "ID", 1);
            Struct after2 = after(sourceRecord2);
            Assertions.assertThat(after2.get("ID")).isEqualTo(1);
            assertXmlFieldIsEqual(after2, "DATA", "<?xml version=\"1.0\"?><warehouse><dept>25</dept></warehouse>");
            this.connection.execute(new String[]{"DELETE FROM dbz3605 WHERE id = 1"});
            List recordsForTopic3 = consumeRecordsByTopic(1).recordsForTopic(topicName("DBZ3605"));
            Assertions.assertThat(recordsForTopic3).hasSize(1);
            SourceRecord sourceRecord3 = (SourceRecord) recordsForTopic3.get(0);
            VerifyRecord.isValidDelete(sourceRecord3, "ID", 1);
            Struct before = before(sourceRecord3);
            Assertions.assertThat(before.get("ID")).isEqualTo(1);
            assertFieldIsUnavailablePlaceholder(before, "DATA", build);
            Assertions.assertThat(after(sourceRecord3)).isNull();
            TestHelper.dropTable(this.connection, "dbz3605");
        } catch (Throwable th) {
            TestHelper.dropTable(this.connection, "dbz3605");
            throw th;
        }
    }

    @Test
    @FixFor({"DBZ-3605"})
    public void shouldStreamTableWithXmlTypeColumnWithShortXmlData() throws Exception {
        TestHelper.dropTable(this.connection, "dbz3605");
        try {
            this.connection.execute(new String[]{"CREATE TABLE DBZ3605 (ID numeric(9,0), DATA xmltype, primary key(ID))"});
            TestHelper.streamTable(this.connection, "dbz3605");
            Configuration build = getDefaultXmlConfig().with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.DBZ3605").build();
            start(OracleConnector.class, build);
            assertConnectorIsRunning();
            waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
            String str = XML_DATA;
            this.connection.prepareQuery("insert into dbz3605 values (1, xmltype(?))", preparedStatement -> {
                preparedStatement.setObject(1, str);
            }, (JdbcConnection.ResultSetConsumer) null);
            this.connection.commit();
            List recordsForTopic = consumeRecordsByTopic(1).recordsForTopic(topicName("DBZ3605"));
            Assertions.assertThat(recordsForTopic).hasSize(1);
            SourceRecord sourceRecord = (SourceRecord) recordsForTopic.get(0);
            VerifyRecord.isValidInsert(sourceRecord, "ID", 1);
            Struct after = after(sourceRecord);
            Assertions.assertThat(after.get("ID")).isEqualTo(1);
            assertXmlFieldIsEqual(after, "DATA", str);
            String str2 = XML_DATA2;
            this.connection.prepareQuery("UPDATE dbz3605 SET data = xmltype(?) WHERE id=1", preparedStatement2 -> {
                preparedStatement2.setObject(1, str2);
            }, (JdbcConnection.ResultSetConsumer) null);
            this.connection.commit();
            List recordsForTopic2 = consumeRecordsByTopic(1).recordsForTopic(topicName("DBZ3605"));
            Assertions.assertThat(recordsForTopic2).hasSize(1);
            SourceRecord sourceRecord2 = (SourceRecord) recordsForTopic2.get(0);
            VerifyRecord.isValidUpdate(sourceRecord2, "ID", 1);
            Struct after2 = after(sourceRecord2);
            Assertions.assertThat(after2.get("ID")).isEqualTo(1);
            assertXmlFieldIsEqual(after2, "DATA", str2);
            this.connection.execute(new String[]{"DELETE FROM dbz3605 WHERE id = 1"});
            List recordsForTopic3 = consumeRecordsByTopic(1).recordsForTopic(topicName("DBZ3605"));
            Assertions.assertThat(recordsForTopic3).hasSize(1);
            SourceRecord sourceRecord3 = (SourceRecord) recordsForTopic3.get(0);
            VerifyRecord.isValidDelete(sourceRecord3, "ID", 1);
            Struct before = before(sourceRecord3);
            Assertions.assertThat(before.get("ID")).isEqualTo(1);
            assertFieldIsUnavailablePlaceholder(before, "DATA", build);
            Assertions.assertThat(after(sourceRecord3)).isNull();
            TestHelper.dropTable(this.connection, "dbz3605");
        } catch (Throwable th) {
            TestHelper.dropTable(this.connection, "dbz3605");
            throw th;
        }
    }

    @Test
    @FixFor({"DBZ-3605"})
    public void shouldStreamTableWithXmlTypeColumnWithLongXmlData() throws Exception {
        TestHelper.dropTable(this.connection, "dbz3605");
        try {
            this.connection.execute(new String[]{"CREATE TABLE DBZ3605 (ID numeric(9,0), DATA xmltype, primary key(ID))"});
            TestHelper.streamTable(this.connection, "dbz3605");
            Configuration build = getDefaultXmlConfig().with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.DBZ3605").build();
            start(OracleConnector.class, build);
            assertConnectorIsRunning();
            waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
            String str = XML_LONG_DATA;
            this.connection.prepareQuery("insert into dbz3605 values (1,?)", preparedStatement -> {
                preparedStatement.setObject(1, toXmlType(str));
            }, (JdbcConnection.ResultSetConsumer) null);
            this.connection.commit();
            List recordsForTopic = consumeRecordsByTopic(1).recordsForTopic(topicName("DBZ3605"));
            Assertions.assertThat(recordsForTopic).hasSize(1);
            SourceRecord sourceRecord = (SourceRecord) recordsForTopic.get(0);
            VerifyRecord.isValidInsert(sourceRecord, "ID", 1);
            Struct after = after(sourceRecord);
            Assertions.assertThat(after.get("ID")).isEqualTo(1);
            assertXmlFieldIsEqual(after, "DATA", str);
            String str2 = XML_LONG_DATA2;
            this.connection.prepareQuery("UPDATE dbz3605 SET data = ? WHERE id=1", preparedStatement2 -> {
                preparedStatement2.setObject(1, toXmlType(str2));
            }, (JdbcConnection.ResultSetConsumer) null);
            this.connection.commit();
            List recordsForTopic2 = consumeRecordsByTopic(1).recordsForTopic(topicName("DBZ3605"));
            Assertions.assertThat(recordsForTopic2).hasSize(1);
            SourceRecord sourceRecord2 = (SourceRecord) recordsForTopic2.get(0);
            VerifyRecord.isValidUpdate(sourceRecord2, "ID", 1);
            Struct after2 = after(sourceRecord2);
            Assertions.assertThat(after2.get("ID")).isEqualTo(1);
            assertXmlFieldIsEqual(after2, "DATA", str2);
            this.connection.execute(new String[]{"DELETE FROM dbz3605 WHERE id = 1"});
            List recordsForTopic3 = consumeRecordsByTopic(1).recordsForTopic(topicName("DBZ3605"));
            Assertions.assertThat(recordsForTopic3).hasSize(1);
            SourceRecord sourceRecord3 = (SourceRecord) recordsForTopic3.get(0);
            VerifyRecord.isValidDelete(sourceRecord3, "ID", 1);
            Struct before = before(sourceRecord3);
            Assertions.assertThat(before.get("ID")).isEqualTo(1);
            assertFieldIsUnavailablePlaceholder(before, "DATA", build);
            Assertions.assertThat(after(sourceRecord3)).isNull();
            TestHelper.dropTable(this.connection, "dbz3605");
        } catch (Throwable th) {
            TestHelper.dropTable(this.connection, "dbz3605");
            throw th;
        }
    }

    @Test
    @FixFor({"DBZ-3605"})
    public void shouldStreamTableWithXmlTypeColumnAndOtherNonLobColumns() throws Exception {
        TestHelper.dropTable(this.connection, "dbz3605");
        try {
            this.connection.execute(new String[]{"CREATE TABLE DBZ3605 (ID numeric(9,0), DATA xmltype, DATA2 varchar2(50))"});
            TestHelper.streamTable(this.connection, "dbz3605");
            Configuration build = getDefaultXmlConfig().with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.DBZ3605").build();
            start(OracleConnector.class, build);
            assertConnectorIsRunning();
            waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
            String str = XML_LONG_DATA;
            this.connection.prepareQuery("insert into dbz3605 values (1,?,'Acme')", preparedStatement -> {
                preparedStatement.setObject(1, toXmlType(str));
            }, (JdbcConnection.ResultSetConsumer) null);
            this.connection.commit();
            List recordsForTopic = consumeRecordsByTopic(1).recordsForTopic(topicName("DBZ3605"));
            Assertions.assertThat(recordsForTopic).hasSize(1);
            SourceRecord sourceRecord = (SourceRecord) recordsForTopic.get(0);
            VerifyRecord.isValidInsert(sourceRecord, false);
            Struct after = after(sourceRecord);
            Assertions.assertThat(after.get("ID")).isEqualTo(1);
            assertXmlFieldIsEqual(after, "DATA", str);
            Assertions.assertThat(after.get("DATA2")).isEqualTo("Acme");
            String str2 = XML_LONG_DATA2;
            this.connection.prepareQuery("UPDATE dbz3605 SET data = ? WHERE id=1", preparedStatement2 -> {
                preparedStatement2.setObject(1, toXmlType(str2));
            }, (JdbcConnection.ResultSetConsumer) null);
            this.connection.commit();
            List recordsForTopic2 = consumeRecordsByTopic(1).recordsForTopic(topicName("DBZ3605"));
            Assertions.assertThat(recordsForTopic2).hasSize(1);
            SourceRecord sourceRecord2 = (SourceRecord) recordsForTopic2.get(0);
            VerifyRecord.isValidUpdate(sourceRecord2, false);
            Struct after2 = after(sourceRecord2);
            Assertions.assertThat(after2.get("ID")).isEqualTo(1);
            assertXmlFieldIsEqual(after2, "DATA", str2);
            Assertions.assertThat(after2.get("DATA2")).isEqualTo("Acme");
            this.connection.prepareQuery("UPDATE dbz3605 SET data = ?, DATA2 = 'Data' WHERE id=1", preparedStatement3 -> {
                preparedStatement3.setObject(1, toXmlType(str));
            }, (JdbcConnection.ResultSetConsumer) null);
            this.connection.commit();
            List recordsForTopic3 = consumeRecordsByTopic(1).recordsForTopic(topicName("DBZ3605"));
            Assertions.assertThat(recordsForTopic3).hasSize(1);
            SourceRecord sourceRecord3 = (SourceRecord) recordsForTopic3.get(0);
            VerifyRecord.isValidUpdate(sourceRecord3, false);
            Struct after3 = after(sourceRecord3);
            Assertions.assertThat(after3.get("ID")).isEqualTo(1);
            assertXmlFieldIsEqual(after3, "DATA", str);
            Assertions.assertThat(after3.get("DATA2")).isEqualTo("Data");
            this.connection.execute(new String[]{"UPDATE dbz3605 SET DATA2 = 'Acme' WHERE id=1"});
            List recordsForTopic4 = consumeRecordsByTopic(1).recordsForTopic(topicName("DBZ3605"));
            Assertions.assertThat(recordsForTopic4).hasSize(1);
            SourceRecord sourceRecord4 = (SourceRecord) recordsForTopic4.get(0);
            VerifyRecord.isValidUpdate(sourceRecord4, false);
            Struct after4 = after(sourceRecord4);
            Assertions.assertThat(after4.get("ID")).isEqualTo(1);
            assertFieldIsUnavailablePlaceholder(after4, "DATA", build);
            Assertions.assertThat(after4.get("DATA2")).isEqualTo("Acme");
            this.connection.execute(new String[]{"DELETE FROM dbz3605 WHERE id = 1"});
            List recordsForTopic5 = consumeRecordsByTopic(1).recordsForTopic(topicName("DBZ3605"));
            Assertions.assertThat(recordsForTopic5).hasSize(1);
            SourceRecord sourceRecord5 = (SourceRecord) recordsForTopic5.get(0);
            VerifyRecord.isValidDelete(sourceRecord5, false);
            Struct before = before(sourceRecord5);
            Assertions.assertThat(before.get("ID")).isEqualTo(1);
            assertFieldIsUnavailablePlaceholder(before, "DATA", build);
            Assertions.assertThat(before.get("DATA2")).isEqualTo("Acme");
            Assertions.assertThat(after(sourceRecord5)).isNull();
            TestHelper.dropTable(this.connection, "dbz3605");
        } catch (Throwable th) {
            TestHelper.dropTable(this.connection, "dbz3605");
            throw th;
        }
    }

    @Test
    @FixFor({"DBZ-3605"})
    public void shouldStreamTableWithNoPrimaryKeyWithXmlTypeColumn() throws Exception {
        TestHelper.dropTable(this.connection, "dbz3605");
        try {
            this.connection.execute(new String[]{"CREATE TABLE DBZ3605 (ID numeric(9,0), DATA xmltype)"});
            TestHelper.streamTable(this.connection, "dbz3605");
            Configuration build = getDefaultXmlConfig().with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.DBZ3605").build();
            start(OracleConnector.class, build);
            assertConnectorIsRunning();
            waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
            String str = XML_LONG_DATA;
            this.connection.prepareQuery("insert into dbz3605 values (1,?)", preparedStatement -> {
                preparedStatement.setObject(1, toXmlType(str));
            }, (JdbcConnection.ResultSetConsumer) null);
            this.connection.commit();
            List recordsForTopic = consumeRecordsByTopic(1).recordsForTopic(topicName("DBZ3605"));
            Assertions.assertThat(recordsForTopic).hasSize(1);
            SourceRecord sourceRecord = (SourceRecord) recordsForTopic.get(0);
            VerifyRecord.isValidInsert(sourceRecord, false);
            Struct after = after(sourceRecord);
            Assertions.assertThat(after.get("ID")).isEqualTo(1);
            assertXmlFieldIsEqual(after, "DATA", str);
            String str2 = XML_LONG_DATA2;
            this.connection.prepareQuery("UPDATE dbz3605 SET data = ? WHERE id=1", preparedStatement2 -> {
                preparedStatement2.setObject(1, toXmlType(str2));
            }, (JdbcConnection.ResultSetConsumer) null);
            this.connection.commit();
            List recordsForTopic2 = consumeRecordsByTopic(1).recordsForTopic(topicName("DBZ3605"));
            Assertions.assertThat(recordsForTopic2).hasSize(1);
            SourceRecord sourceRecord2 = (SourceRecord) recordsForTopic2.get(0);
            VerifyRecord.isValidUpdate(sourceRecord2, false);
            Struct after2 = after(sourceRecord2);
            Assertions.assertThat(after2.get("ID")).isEqualTo(1);
            assertXmlFieldIsEqual(after2, "DATA", str2);
            this.connection.execute(new String[]{"DELETE FROM dbz3605 WHERE id = 1"});
            List recordsForTopic3 = consumeRecordsByTopic(1).recordsForTopic(topicName("DBZ3605"));
            Assertions.assertThat(recordsForTopic3).hasSize(1);
            SourceRecord sourceRecord3 = (SourceRecord) recordsForTopic3.get(0);
            VerifyRecord.isValidDelete(sourceRecord3, false);
            Struct before = before(sourceRecord3);
            Assertions.assertThat(before.get("ID")).isEqualTo(1);
            assertFieldIsUnavailablePlaceholder(before, "DATA", build);
            Assertions.assertThat(after(sourceRecord3)).isNull();
            TestHelper.dropTable(this.connection, "dbz3605");
        } catch (Throwable th) {
            TestHelper.dropTable(this.connection, "dbz3605");
            throw th;
        }
    }

    @Test
    @FixFor({"DBZ-3605"})
    public void shouldStreamTableWithXmlTypeColumnAndAnotherLobColumn() throws Exception {
        TestHelper.dropTable(this.connection, "dbz3605");
        try {
            this.connection.execute(new String[]{"CREATE TABLE DBZ3605 (ID numeric(9,0), DATA xmltype, DATA2 clob)"});
            TestHelper.streamTable(this.connection, "dbz3605");
            Configuration build = getDefaultXmlConfig().with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.DBZ3605").build();
            start(OracleConnector.class, build);
            assertConnectorIsRunning();
            waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
            String str = XML_LONG_DATA;
            Clob createClob = this.connection.connection().createClob();
            createClob.setString(1L, XML_LONG_DATA);
            this.connection.prepareQuery("insert into dbz3605 values (1,?,?)", preparedStatement -> {
                preparedStatement.setObject(1, toXmlType(str));
                preparedStatement.setClob(2, createClob);
            }, (JdbcConnection.ResultSetConsumer) null);
            this.connection.commit();
            List recordsForTopic = consumeRecordsByTopic(1).recordsForTopic(topicName("DBZ3605"));
            Assertions.assertThat(recordsForTopic).hasSize(1);
            SourceRecord sourceRecord = (SourceRecord) recordsForTopic.get(0);
            VerifyRecord.isValidInsert(sourceRecord, false);
            Struct after = after(sourceRecord);
            Assertions.assertThat(after.get("ID")).isEqualTo(1);
            assertXmlFieldIsEqual(after, "DATA", str);
            Assertions.assertThat(after.get("DATA2")).isEqualTo(createClob.getSubString(1L, (int) createClob.length()));
            String str2 = XML_LONG_DATA2;
            this.connection.prepareQuery("UPDATE dbz3605 SET data = ? WHERE id=1", preparedStatement2 -> {
                preparedStatement2.setObject(1, toXmlType(str2));
            }, (JdbcConnection.ResultSetConsumer) null);
            this.connection.commit();
            List recordsForTopic2 = consumeRecordsByTopic(1).recordsForTopic(topicName("DBZ3605"));
            Assertions.assertThat(recordsForTopic2).hasSize(1);
            SourceRecord sourceRecord2 = (SourceRecord) recordsForTopic2.get(0);
            VerifyRecord.isValidUpdate(sourceRecord2, false);
            Struct after2 = after(sourceRecord2);
            Assertions.assertThat(after2.get("ID")).isEqualTo(1);
            assertXmlFieldIsEqual(after2, "DATA", str2);
            assertFieldIsUnavailablePlaceholder(after2, "DATA2", build);
            this.connection.execute(new String[]{"DELETE FROM dbz3605 WHERE id = 1"});
            List recordsForTopic3 = consumeRecordsByTopic(1).recordsForTopic(topicName("DBZ3605"));
            Assertions.assertThat(recordsForTopic3).hasSize(1);
            SourceRecord sourceRecord3 = (SourceRecord) recordsForTopic3.get(0);
            VerifyRecord.isValidDelete(sourceRecord3, false);
            Struct before = before(sourceRecord3);
            Assertions.assertThat(before.get("ID")).isEqualTo(1);
            assertFieldIsUnavailablePlaceholder(before, "DATA", build);
            assertFieldIsUnavailablePlaceholder(before, "DATA2", build);
            Assertions.assertThat(after(sourceRecord3)).isNull();
            TestHelper.dropTable(this.connection, "dbz3605");
        } catch (Throwable th) {
            TestHelper.dropTable(this.connection, "dbz3605");
            throw th;
        }
    }

    @Test
    @FixFor({"DBZ-6782"})
    public void shouldProperlyResolveAddedXmlColumnTypeAndStreamChanges() throws Exception {
        TestHelper.dropTable(this.connection, "dbz6782");
        try {
            this.connection.execute(new String[]{"CREATE TABLE dbz6782 (ID numeric(9,0), DATA xmltype, primary key(ID))"});
            TestHelper.streamTable(this.connection, "dbz6782");
            String str = XML_LONG_DATA;
            this.connection.prepareQuery("insert into dbz6782 values (1,?)", preparedStatement -> {
                preparedStatement.setObject(1, toXmlType(str));
            }, (JdbcConnection.ResultSetConsumer) null);
            this.connection.commit();
            start(OracleConnector.class, getDefaultXmlConfig().with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.DBZ6782").with(OracleConnectorConfig.INCLUDE_SCHEMA_CHANGES, "true").with(SchemaHistory.STORE_ONLY_CAPTURED_TABLES_DDL, "true").build());
            assertConnectorIsRunning();
            waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
            this.connection.execute(new String[]{"ALTER TABLE dbz6782 add DATA2 xmltype"});
            String str2 = XML_LONG_DATA2;
            this.connection.prepareQuery("insert into dbz6782 values (2,?,?)", preparedStatement2 -> {
                preparedStatement2.setObject(1, toXmlType(str));
                preparedStatement2.setObject(2, toXmlType(str2));
            }, (JdbcConnection.ResultSetConsumer) null);
            this.connection.commit();
            AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(4);
            List recordsForTopic = consumeRecordsByTopic.recordsForTopic(topicName("DBZ6782"));
            Assertions.assertThat(recordsForTopic).hasSize(2);
            SourceRecord sourceRecord = (SourceRecord) recordsForTopic.get(0);
            VerifyRecord.isValidRead(sourceRecord, "ID", 1);
            Struct after = after(sourceRecord);
            Assertions.assertThat(after.get("ID")).isEqualTo(1);
            assertXmlFieldIsEqual(after, "DATA", str);
            SourceRecord sourceRecord2 = (SourceRecord) recordsForTopic.get(1);
            VerifyRecord.isValidInsert(sourceRecord2, "ID", 2);
            Struct after2 = after(sourceRecord2);
            Assertions.assertThat(after2.get("ID")).isEqualTo(2);
            assertXmlFieldIsEqual(after2, "DATA", str);
            assertXmlFieldIsEqual(after2, "DATA2", str2);
            List array = ((Struct) ((SourceRecord) consumeRecordsByTopic.recordsForTopic(TestHelper.SERVER_NAME).get(1)).value()).getArray("tableChanges");
            Assertions.assertThat(array).hasSize(1);
            Struct struct = (Struct) array.get(0);
            Assertions.assertThat(struct.getString("type")).isEqualTo("ALTER");
            Assertions.assertThat(struct.getString("id")).contains(new CharSequence[]{"\"DBZ6782\""});
            for (Struct struct2 : struct.getStruct("table").getArray("columns")) {
                if (struct2.getString("name").startsWith("DATA")) {
                    Assertions.assertThat(struct2.get("jdbcType")).isEqualTo(2009);
                    Assertions.assertThat(struct2.get("typeName")).isEqualTo("XMLTYPE");
                    Assertions.assertThat(struct2.get("typeExpression")).isEqualTo("XMLTYPE");
                }
            }
            assertNoRecordsToConsume();
            TestHelper.dropTable(this.connection, "dbz6782");
        } catch (Throwable th) {
            TestHelper.dropTable(this.connection, "dbz6782");
            throw th;
        }
    }

    @Test
    @FixFor({"DBZ-7489"})
    public void shouldHandleStreamingSettingXmlColumnToNull() throws Exception {
        TestHelper.dropTable(this.connection, "dbz7489");
        try {
            this.connection.execute(new String[]{"CREATE TABLE dbz7489 (ID numeric(9,0), DATA xmltype, primary key(ID))"});
            TestHelper.streamTable(this.connection, "dbz7489");
            start(OracleConnector.class, getDefaultXmlConfig().with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.DBZ7489").build());
            assertConnectorIsRunning();
            waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
            String str = XML_LONG_DATA;
            this.connection.prepareQuery("insert into dbz7489 values (1,?)", preparedStatement -> {
                preparedStatement.setObject(1, toXmlType(str));
            }, (JdbcConnection.ResultSetConsumer) null);
            this.connection.commit();
            List recordsForTopic = consumeRecordsByTopic(1).recordsForTopic(topicName("DBZ7489"));
            Assertions.assertThat(recordsForTopic).hasSize(1);
            SourceRecord sourceRecord = (SourceRecord) recordsForTopic.get(0);
            VerifyRecord.isValidInsert(sourceRecord, "ID", 1);
            Struct after = after(sourceRecord);
            Assertions.assertThat(after.get("ID")).isEqualTo(1);
            assertXmlFieldIsEqual(after, "DATA", str);
            this.connection.execute(new String[]{"UPDATE dbz7489 SET data = NULL where id = 1"});
            List recordsForTopic2 = consumeRecordsByTopic(1).recordsForTopic(topicName("DBZ7489"));
            Assertions.assertThat(recordsForTopic2).hasSize(1);
            SourceRecord sourceRecord2 = (SourceRecord) recordsForTopic2.get(0);
            VerifyRecord.isValidUpdate(sourceRecord2, "ID", 1);
            Struct after2 = after(sourceRecord2);
            Assertions.assertThat(after2.get("ID")).isEqualTo(1);
            Assertions.assertThat(after2.get("DATA")).isNull();
            TestHelper.dropTable(this.connection, "dbz7489");
        } catch (Throwable th) {
            TestHelper.dropTable(this.connection, "dbz7489");
            throw th;
        }
    }

    private Configuration.Builder getDefaultXmlConfig() {
        return TestHelper.defaultConfig().with(OracleConnectorConfig.LOB_ENABLED, true);
    }

    private XMLType toXmlType(String str) throws SQLException {
        return XMLType.createXML(this.connection.connection(), str, "oracle.xml.parser.XMLDocument.THIN");
    }

    private static void assertFieldIsUnavailablePlaceholder(Struct struct, String str, Configuration configuration) {
        Assertions.assertThat(struct.getString(str)).isEqualTo(configuration.getString(OracleConnectorConfig.UNAVAILABLE_VALUE_PLACEHOLDER));
    }

    private static void assertXmlFieldIsEqual(Struct struct, String str, String str2) {
        Assertions.assertThat(formatToOracleXml(struct.getString(str))).isEqualTo(formatToOracleXml(str2));
    }

    private static String formatToOracleXml(String str) {
        if (str == null) {
            return null;
        }
        try {
            Transformer newTransformer = TransformerFactory.newInstance().newTransformer(new StreamSource(Testing.Files.readResourceAsStream("xml-format.xslt")));
            StreamSource streamSource = new StreamSource(new StringReader(str));
            StreamResult streamResult = new StreamResult(new StringWriter());
            newTransformer.transform(streamSource, streamResult);
            return streamResult.getWriter().toString();
        } catch (Exception e) {
            throw new RuntimeException("Failed to parse XML: " + str, e);
        }
    }

    private static String topicName(String str) {
        return "server1.DEBEZIUM." + str;
    }

    private static Struct before(SourceRecord sourceRecord) {
        return ((Struct) sourceRecord.value()).getStruct("before");
    }

    private static Struct after(SourceRecord sourceRecord) {
        return ((Struct) sourceRecord.value()).getStruct("after");
    }
}
