package io.debezium.connector.oracle;

import io.debezium.config.CommonConnectorConfig;
import io.debezium.connector.oracle.OracleConnectorConfig;
import io.debezium.connector.oracle.util.TestHelper;
import io.debezium.data.VerifyRecord;
import io.debezium.embedded.AbstractConnectorTest;
import io.debezium.util.Testing;
import java.sql.SQLException;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.connect.source.SourceRecord;
import org.assertj.core.api.Assertions;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;

/* loaded from: input_file:io/debezium/connector/oracle/CustomSnapshotterIT.class */
public class CustomSnapshotterIT extends AbstractConnectorTest {
    private static OracleConnection connection;
    private static final String PK_FIELD = "PK";

    @BeforeClass
    public static void beforeClass() throws SQLException {
        connection = TestHelper.testConnection();
        TestHelper.dropAllTables();
        connection.execute(new String[]{"CREATE TABLE debezium.a (pk numeric(9,0), aa integer, primary key(pk))"});
        connection.execute(new String[]{"GRANT SELECT ON debezium.a to  " + TestHelper.getConnectorUserName()});
        connection.execute(new String[]{"ALTER TABLE debezium.a ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS"});
        connection.execute(new String[]{"CREATE TABLE debezium.b (pk numeric(9,0), aa integer, PRIMARY KEY(pk))"});
        connection.execute(new String[]{"GRANT SELECT ON debezium.b to  " + TestHelper.getConnectorUserName()});
        connection.execute(new String[]{"ALTER TABLE debezium.b ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS"});
    }

    @AfterClass
    public static void closeConnection() throws SQLException {
        if (connection != null) {
            TestHelper.dropTable(connection, "debezium.a");
            TestHelper.dropTable(connection, "debezium.b");
            connection.close();
        }
    }

    @Before
    public void before() throws SQLException {
        setConsumeTimeout(TestHelper.defaultMessageConsumerPollTimeout(), TimeUnit.SECONDS);
        initializeConnectorTestFramework();
        Testing.Files.delete(TestHelper.SCHEMA_HISTORY_PATH);
    }

    @Test
    public void shouldAllowForCustomSnapshot() throws InterruptedException, SQLException {
        connection.execute(new String[]{"INSERT INTO debezium.a (pk, aa) VALUES (1, 1)"});
        connection.execute(new String[]{"INSERT INTO debezium.b (pk, aa) VALUES (1, 1)"});
        start(OracleConnector.class, TestHelper.defaultConfig().with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.A, DEBEZIUM\\.B").with(OracleConnectorConfig.SNAPSHOT_MODE, OracleConnectorConfig.SnapshotMode.CUSTOM.getValue()).with(OracleConnectorConfig.SNAPSHOT_MODE_CUSTOM_NAME, CustomTestSnapshot.class.getName()).with(OracleConnectorConfig.SNAPSHOT_QUERY_MODE, CommonConnectorConfig.SnapshotQueryMode.CUSTOM).with(OracleConnectorConfig.SNAPSHOT_QUERY_MODE_CUSTOM_NAME, CustomTestSnapshot.class.getName()).build());
        assertConnectorIsRunning();
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(1);
        List recordsForTopic = consumeRecordsByTopic.recordsForTopic(topicName("DEBEZIUM", "A"));
        List recordsForTopic2 = consumeRecordsByTopic.recordsForTopic(topicName("DEBEZIUM", "B"));
        Assertions.assertThat(recordsForTopic.size()).isEqualTo(1);
        Assertions.assertThat(recordsForTopic2).isNull();
        VerifyRecord.isValidRead((SourceRecord) recordsForTopic.get(0), PK_FIELD, 1);
        connection.execute(new String[]{"INSERT INTO debezium.a (pk, aa) VALUES (2, 1)"});
        connection.execute(new String[]{"INSERT INTO debezium.b (pk, aa) VALUES (2, 1)"});
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic2 = consumeRecordsByTopic(2);
        List recordsForTopic3 = consumeRecordsByTopic2.recordsForTopic(topicName("DEBEZIUM", "A"));
        List recordsForTopic4 = consumeRecordsByTopic2.recordsForTopic(topicName("DEBEZIUM", "B"));
        Assertions.assertThat(recordsForTopic3.size()).isEqualTo(1);
        Assertions.assertThat(recordsForTopic4.size()).isEqualTo(1);
        VerifyRecord.isValidInsert((SourceRecord) recordsForTopic3.get(0), PK_FIELD, 2);
        VerifyRecord.isValidInsert((SourceRecord) recordsForTopic4.get(0), PK_FIELD, 2);
        stopConnector();
        start(OracleConnector.class, TestHelper.defaultConfig().with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.A, DEBEZIUM\\.B").with(OracleConnectorConfig.SNAPSHOT_MODE, OracleConnectorConfig.SnapshotMode.CUSTOM.getValue()).with(OracleConnectorConfig.SNAPSHOT_MODE_CUSTOM_NAME, CustomTestSnapshot.class.getName()).with(OracleConnectorConfig.SNAPSHOT_QUERY_MODE, CommonConnectorConfig.SnapshotQueryMode.CUSTOM).with(OracleConnectorConfig.SNAPSHOT_QUERY_MODE_CUSTOM_NAME, CustomTestSnapshot.class.getName()).build());
        assertConnectorIsRunning();
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic3 = consumeRecordsByTopic(4);
        List recordsForTopic5 = consumeRecordsByTopic3.recordsForTopic(topicName("DEBEZIUM", "A"));
        List recordsForTopic6 = consumeRecordsByTopic3.recordsForTopic(topicName("DEBEZIUM", "B"));
        Assertions.assertThat(recordsForTopic5.size()).isEqualTo(2);
        Assertions.assertThat(recordsForTopic6.size()).isEqualTo(2);
        VerifyRecord.isValidRead((SourceRecord) recordsForTopic5.get(0), PK_FIELD, 1);
        VerifyRecord.isValidRead((SourceRecord) recordsForTopic5.get(1), PK_FIELD, 2);
        VerifyRecord.isValidRead((SourceRecord) recordsForTopic6.get(0), PK_FIELD, 1);
        VerifyRecord.isValidRead((SourceRecord) recordsForTopic6.get(1), PK_FIELD, 2);
    }

    private String topicName(String str, String str2) {
        return "server1." + str + "." + str2;
    }
}
