package io.debezium.testing.system.tests.db2;

import io.debezium.testing.system.assertions.KafkaAssertions;
import io.debezium.testing.system.tests.ConnectorTest;
import io.debezium.testing.system.tools.ConfigProperties;
import io.debezium.testing.system.tools.databases.SqlDatabaseController;
import io.debezium.testing.system.tools.kafka.ConnectorConfigBuilder;
import io.debezium.testing.system.tools.kafka.KafkaConnectController;
import io.debezium.testing.system.tools.kafka.KafkaController;
import java.sql.SQLException;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.Response;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Order;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:io/debezium/testing/system/tests/db2/Db2Tests.class */
public abstract class Db2Tests extends ConnectorTest {
    public Db2Tests(KafkaController kafkaController, KafkaConnectController kafkaConnectController, ConnectorConfigBuilder connectorConfigBuilder, KafkaAssertions<?, ?> kafkaAssertions) {
        super(kafkaController, kafkaConnectController, connectorConfigBuilder, kafkaAssertions);
    }

    public void insertCustomer(SqlDatabaseController sqlDatabaseController, String str, String str2, String str3) throws SQLException {
        sqlDatabaseController.getDatabaseClient(ConfigProperties.DATABASE_DB2_USERNAME, ConfigProperties.DATABASE_DB2_DBZ_PASSWORD).execute("inventory", "INSERT INTO DB2INST1.CUSTOMERS(first_name,last_name,email) VALUES  ('" + str + "', '" + str2 + "', '" + str3 + "')");
    }

    public void renameCustomer(SqlDatabaseController sqlDatabaseController, String str, String str2) throws SQLException {
        sqlDatabaseController.getDatabaseClient(ConfigProperties.DATABASE_DB2_USERNAME, ConfigProperties.DATABASE_DB2_DBZ_PASSWORD).execute("inventory", "UPDATE DB2INST1.CUSTOMERS SET first_name = '" + str2 + "' WHERE first_name = '" + str + "'");
    }

    @Test
    @Order(10)
    public void shouldHaveRegisteredConnector() {
        Request build = new Request.Builder().url(this.connectController.getApiURL().resolve("/connectors")).build();
        KafkaAssertions.awaitAssert(() -> {
            Response execute = new OkHttpClient().newCall(build).execute();
            try {
                Assertions.assertThat(execute.body().string()).contains(new CharSequence[]{this.connectorConfig.getConnectorName()});
                if (execute != null) {
                    execute.close();
                }
            } catch (Throwable th) {
                if (execute != null) {
                    try {
                        execute.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        });
    }

    @Test
    @Order(20)
    public void shouldCreateKafkaTopics() {
        String dbServerName = this.connectorConfig.getDbServerName();
        this.assertions.assertTopicsExist(dbServerName + ".DB2INST1.CUSTOMERS", dbServerName + ".DB2INST1.ORDERS", dbServerName + ".DB2INST1.PRODUCTS", dbServerName + ".DB2INST1.PRODUCTS_ON_HAND");
    }

    @Test
    @Order(30)
    public void shouldSnapshotChanges() {
        this.connectController.getMetricsReader().waitForDB2Snapshot(this.connectorConfig.getDbServerName());
        String str = this.connectorConfig.getDbServerName() + ".DB2INST1.CUSTOMERS";
        KafkaAssertions.awaitAssert(() -> {
            this.assertions.assertRecordsCount(str, 4);
        });
    }

    @Test
    @Order(40)
    public void shouldStreamChanges(SqlDatabaseController sqlDatabaseController) throws SQLException {
        insertCustomer(sqlDatabaseController, "Tom", "Tester", "tom@test.com");
        String str = this.connectorConfig.getDbServerName() + ".DB2INST1.CUSTOMERS";
        KafkaAssertions.awaitAssert(() -> {
            this.assertions.assertRecordsCount(str, 5);
        });
        KafkaAssertions.awaitAssert(() -> {
            this.assertions.assertRecordsContain(str, "tom@test.com");
        });
    }

    @Test
    @Order(41)
    public void shouldRerouteUpdates(SqlDatabaseController sqlDatabaseController) throws SQLException {
        renameCustomer(sqlDatabaseController, "Tom", "Thomas");
        String dbServerName = this.connectorConfig.getDbServerName();
        String str = dbServerName + ".u.CUSTOMERS";
        KafkaAssertions.awaitAssert(() -> {
            this.assertions.assertRecordsCount(dbServerName + ".DB2INST1.CUSTOMERS", 5);
        });
        KafkaAssertions.awaitAssert(() -> {
            this.assertions.assertRecordsCount(str, 1);
        });
        KafkaAssertions.awaitAssert(() -> {
            this.assertions.assertRecordsContain(str, "Thomas");
        });
    }

    @Test
    @Order(50)
    public void shouldBeDown(SqlDatabaseController sqlDatabaseController) throws Exception {
        this.connectController.undeployConnector(this.connectorConfig.getConnectorName());
        insertCustomer(sqlDatabaseController, "Jerry", "Tester", "jerry@test.com");
        String str = this.connectorConfig.getDbServerName() + ".DB2INST1.CUSTOMERS";
        KafkaAssertions.awaitAssert(() -> {
            this.assertions.assertRecordsCount(str, 5);
        });
    }

    @Test
    @Order(60)
    public void shouldResumeStreamingAfterRedeployment() throws Exception {
        this.connectController.deployConnector(this.connectorConfig);
        String str = this.connectorConfig.getDbServerName() + ".DB2INST1.CUSTOMERS";
        KafkaAssertions.awaitAssert(() -> {
            this.assertions.assertRecordsCount(str, 6);
        });
        KafkaAssertions.awaitAssert(() -> {
            this.assertions.assertRecordsContain(str, "jerry@test.com");
        });
    }

    @Test
    @Order(70)
    public void shouldBeDownAfterCrash(SqlDatabaseController sqlDatabaseController) throws SQLException {
        this.connectController.destroy();
        insertCustomer(sqlDatabaseController, "Nibbles", "Tester", "nibbles@test.com");
        String str = this.connectorConfig.getDbServerName() + ".DB2INST1.CUSTOMERS";
        KafkaAssertions.awaitAssert(() -> {
            this.assertions.assertRecordsCount(str, 6);
        });
    }

    @Test
    @Order(80)
    public void shouldResumeStreamingAfterCrash() throws InterruptedException {
        this.connectController.restore();
        String str = this.connectorConfig.getDbServerName() + ".DB2INST1.CUSTOMERS";
        KafkaAssertions.awaitAssert(() -> {
            this.assertions.assertMinimalRecordsCount(str, 7);
        });
        KafkaAssertions.awaitAssert(() -> {
            this.assertions.assertRecordsContain(str, "nibbles@test.com");
        });
    }
}
