package io.debezium.connector.oracle;

import io.debezium.connector.oracle.OracleConnectorConfig;
import io.debezium.connector.oracle.util.TestHelper;
import io.debezium.data.VerifyRecord;
import io.debezium.doc.FixFor;
import io.debezium.embedded.AbstractConnectorTest;
import io.debezium.util.Collect;
import io.debezium.util.Testing;
import java.math.BigDecimal;
import java.sql.SQLException;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.fest.assertions.Assertions;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;

/* loaded from: input_file:io/debezium/connector/oracle/TransactionMetadataIT.class */
public class TransactionMetadataIT extends AbstractConnectorTest {
    private static OracleConnection connection;

    @BeforeClass
    public static void beforeClass() throws SQLException {
        connection = TestHelper.testConnection();
        TestHelper.dropTable(connection, "debezium.customer");
        TestHelper.dropTable(connection, "debezium.orders");
        connection.execute(new String[]{"create table debezium.customer (  id numeric(9,0) not null,   name varchar2(1000),   score decimal(6, 2),   registered timestamp,   primary key (id))"});
        connection.execute(new String[]{"GRANT SELECT ON debezium.customer to  " + TestHelper.getConnectorUserName()});
        connection.execute(new String[]{"ALTER TABLE debezium.customer ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS"});
        connection.execute(new String[]{"create table debezium.orders ( id number(6) not null primary key,  order_date date not null,  purchaser number(4) not null,  quantity number(4) not null,  product_id number(4) not null)"});
        connection.execute(new String[]{"GRANT SELECT ON debezium.orders to  " + TestHelper.getConnectorUserName()});
        connection.execute(new String[]{"ALTER TABLE debezium.orders ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS"});
    }

    @AfterClass
    public static void closeConnection() throws SQLException {
        if (connection != null) {
            TestHelper.dropTable(connection, "debezium.orders");
            TestHelper.dropTable(connection, "debezium.customer");
            connection.close();
        }
    }

    @Before
    public void before() throws SQLException {
        connection.execute(new String[]{"delete from debezium.customer"});
        connection.execute(new String[]{"delete from debezium.orders"});
        setConsumeTimeout(TestHelper.defaultMessageConsumerPollTimeout(), TimeUnit.SECONDS);
        initializeConnectorTestFramework();
        Testing.Files.delete(TestHelper.DB_HISTORY_PATH);
    }

    @Test
    public void transactionMetadata() throws Exception {
        start(OracleConnector.class, TestHelper.defaultConfig().with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.CUSTOMER,DEBEZIUM\\.ORDERS").with(OracleConnectorConfig.SNAPSHOT_MODE, OracleConnectorConfig.SnapshotMode.SCHEMA_ONLY).with(OracleConnectorConfig.PROVIDE_TRANSACTION_METADATA, true).with(OracleConnectorConfig.LOG_MINING_STRATEGY, OracleConnectorConfig.LogMiningStrategy.ONLINE_CATALOG).build());
        assertConnectorIsRunning();
        waitForSnapshotToBeCompleted(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
        connection.executeWithoutCommitting(new String[]{"INSERT INTO debezium.customer VALUES (1, 'Billie-Bob', 1234.56, TO_DATE('2018-02-22', 'yyyy-mm-dd'))"});
        connection.executeWithoutCommitting(new String[]{"INSERT INTO debezium.orders VALUES (1, TO_DATE('2021-02-01', 'yyyy-mm-dd'), 1001, 1, 102)"});
        connection.execute(new String[]{"COMMIT"});
        List allRecordsInOrder = consumeRecordsByTopic(4).allRecordsInOrder();
        Assertions.assertThat(allRecordsInOrder).hasSize(4);
        String assertBeginTransaction = assertBeginTransaction((SourceRecord) allRecordsInOrder.get(0));
        SourceRecord sourceRecord = (SourceRecord) allRecordsInOrder.get(1);
        VerifyRecord.isValidInsert(sourceRecord, "ID", 1);
        Struct struct = (Struct) ((Struct) sourceRecord.value()).get("after");
        Assertions.assertThat(struct.get("ID")).isEqualTo(1);
        Assertions.assertThat(struct.get("NAME")).isEqualTo("Billie-Bob");
        Assertions.assertThat(struct.get("SCORE")).isEqualTo(BigDecimal.valueOf(1234.56d));
        assertRecordTransactionMetadata(sourceRecord, assertBeginTransaction, 1L, 1L);
        SourceRecord sourceRecord2 = (SourceRecord) allRecordsInOrder.get(2);
        VerifyRecord.isValidInsert(sourceRecord2, "ID", 1);
        Struct struct2 = (Struct) ((Struct) sourceRecord2.value()).get("after");
        Assertions.assertThat(struct2.get("ID")).isEqualTo(1);
        Assertions.assertThat(struct2.get("ORDER_DATE")).isEqualTo(1612137600000L);
        Assertions.assertThat(struct2.get("PURCHASER")).isEqualTo((short) 1001);
        Assertions.assertThat(struct2.get("QUANTITY")).isEqualTo((short) 1);
        Assertions.assertThat(struct2.get("PRODUCT_ID")).isEqualTo((short) 102);
        assertRecordTransactionMetadata(sourceRecord2, assertBeginTransaction, 2L, 1L);
        SourceRecord sourceRecord3 = (SourceRecord) allRecordsInOrder.get(3);
        String databaseName = TestHelper.getDatabaseName();
        assertEndTransaction(sourceRecord3, assertBeginTransaction, 2L, Collect.hashMapOf(databaseName + ".DEBEZIUM.CUSTOMER", 1, databaseName + ".DEBEZIUM.ORDERS", 1));
    }

    @Test
    @FixFor({"DBZ-3090"})
    public void transactionMetadataMultipleTransactions() throws Exception {
        OracleConnection testConnection = TestHelper.testConnection();
        try {
            String databaseName = TestHelper.getDatabaseName();
            start(OracleConnector.class, TestHelper.defaultConfig().with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.CUSTOMER,DEBEZIUM\\.ORDERS").with(OracleConnectorConfig.SNAPSHOT_MODE, OracleConnectorConfig.SnapshotMode.SCHEMA_ONLY).with(OracleConnectorConfig.PROVIDE_TRANSACTION_METADATA, true).with(OracleConnectorConfig.LOG_MINING_STRATEGY, OracleConnectorConfig.LogMiningStrategy.ONLINE_CATALOG).build());
            assertConnectorIsRunning();
            waitForSnapshotToBeCompleted(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
            connection.executeWithoutCommitting(new String[]{"INSERT INTO debezium.customer VALUES (1, 'Billie-Bob', 1234.56, TO_DATE('2018-02-22', 'yyyy-mm-dd'))"});
            connection.executeWithoutCommitting(new String[]{"INSERT INTO debezium.orders VALUES (2, TO_DATE('2021-02-01', 'yyyy-mm-dd'), 1001, 2, 102)"});
            testConnection.executeWithoutCommitting(new String[]{"INSERT INTO debezium.orders VALUES (1, TO_DATE('2021-02-01', 'yyyy-mm-dd'), 1001, 1, 102)"});
            testConnection.execute(new String[]{"COMMIT"});
            connection.execute(new String[]{"COMMIT"});
            List allRecordsInOrder = consumeRecordsByTopic(7).allRecordsInOrder();
            Assertions.assertThat(allRecordsInOrder).hasSize(7);
            String assertBeginTransaction = assertBeginTransaction((SourceRecord) allRecordsInOrder.get(0));
            SourceRecord sourceRecord = (SourceRecord) allRecordsInOrder.get(1);
            VerifyRecord.isValidInsert(sourceRecord, "ID", 1);
            Struct struct = (Struct) ((Struct) sourceRecord.value()).get("after");
            Assertions.assertThat(struct.get("ID")).isEqualTo(1);
            Assertions.assertThat(struct.get("ORDER_DATE")).isEqualTo(1612137600000L);
            Assertions.assertThat(struct.get("PURCHASER")).isEqualTo((short) 1001);
            Assertions.assertThat(struct.get("QUANTITY")).isEqualTo((short) 1);
            Assertions.assertThat(struct.get("PRODUCT_ID")).isEqualTo((short) 102);
            assertRecordTransactionMetadata(sourceRecord, assertBeginTransaction, 1L, 1L);
            assertEndTransaction((SourceRecord) allRecordsInOrder.get(2), assertBeginTransaction, 1L, Collect.hashMapOf(databaseName + ".DEBEZIUM.ORDERS", 1));
            String assertBeginTransaction2 = assertBeginTransaction((SourceRecord) allRecordsInOrder.get(3));
            SourceRecord sourceRecord2 = (SourceRecord) allRecordsInOrder.get(4);
            VerifyRecord.isValidInsert(sourceRecord2, "ID", 1);
            Struct struct2 = (Struct) ((Struct) sourceRecord2.value()).get("after");
            Assertions.assertThat(struct2.get("ID")).isEqualTo(1);
            Assertions.assertThat(struct2.get("NAME")).isEqualTo("Billie-Bob");
            Assertions.assertThat(struct2.get("SCORE")).isEqualTo(BigDecimal.valueOf(1234.56d));
            assertRecordTransactionMetadata(sourceRecord2, assertBeginTransaction2, 1L, 1L);
            SourceRecord sourceRecord3 = (SourceRecord) allRecordsInOrder.get(5);
            VerifyRecord.isValidInsert(sourceRecord3, "ID", 2);
            Struct struct3 = (Struct) ((Struct) sourceRecord3.value()).get("after");
            Assertions.assertThat(struct3.get("ID")).isEqualTo(2);
            Assertions.assertThat(struct3.get("ORDER_DATE")).isEqualTo(1612137600000L);
            Assertions.assertThat(struct3.get("PURCHASER")).isEqualTo((short) 1001);
            Assertions.assertThat(struct3.get("QUANTITY")).isEqualTo((short) 2);
            Assertions.assertThat(struct3.get("PRODUCT_ID")).isEqualTo((short) 102);
            assertRecordTransactionMetadata(sourceRecord3, assertBeginTransaction2, 2L, 1L);
            assertEndTransaction((SourceRecord) allRecordsInOrder.get(6), assertBeginTransaction2, 2L, Collect.hashMapOf(databaseName + ".DEBEZIUM.CUSTOMER", 1, databaseName + ".DEBEZIUM.ORDERS", 1));
            if (testConnection != null) {
                testConnection.close();
            }
        } catch (Throwable th) {
            if (testConnection != null) {
                try {
                    testConnection.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }
}
