package io.debezium.testing.system.tests.mysql;

import io.debezium.testing.system.assertions.KafkaAssertions;
import io.debezium.testing.system.tools.WaitConditions;
import io.debezium.testing.system.tools.databases.mysql.MySqlController;
import io.debezium.testing.system.tools.databases.mysql.MySqlReplicaController;
import io.debezium.testing.system.tools.kafka.ConnectorConfigBuilder;
import io.debezium.testing.system.tools.kafka.KafkaConnectController;
import io.debezium.testing.system.tools.kafka.KafkaController;
import java.io.IOException;
import java.sql.SQLException;
import java.time.Duration;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.Order;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:io/debezium/testing/system/tests/mysql/MySqlOcpTests.class */
public abstract class MySqlOcpTests extends MySqlTests {
    public MySqlOcpTests(KafkaController kafkaController, KafkaConnectController kafkaConnectController, ConnectorConfigBuilder connectorConfigBuilder, KafkaAssertions<?, ?> kafkaAssertions) {
        super(kafkaController, kafkaConnectController, connectorConfigBuilder, kafkaAssertions);
    }

    @Test
    @Order(100)
    public void shouldStreamFromReplica(MySqlReplicaController mySqlReplicaController, MySqlController mySqlController) throws InterruptedException, IOException, SQLException {
        Awaitility.await().atMost(WaitConditions.scaled(5L), TimeUnit.MINUTES).pollInterval(Duration.ofSeconds(20L)).until(() -> {
            return Boolean.valueOf(Objects.equals(Integer.valueOf(getCustomerCount(mySqlReplicaController)), Integer.valueOf(getCustomerCount(mySqlController))));
        });
        this.connectorConfig.put("database.hostname", mySqlReplicaController.getDatabaseHostname());
        this.connectController.deployConnector(this.connectorConfig);
        insertCustomer(mySqlController, "Arnold", "Test", "atest@test.com");
        String str = this.connectorConfig.getDbServerName() + ".inventory.customers";
        KafkaAssertions.awaitAssert(() -> {
            this.assertions.assertRecordsCount(str, 9);
        });
        KafkaAssertions.awaitAssert(() -> {
            this.assertions.assertRecordsContain(str, "atest@test.com");
        });
    }

    @Test
    @Order(110)
    public void shouldStreamAfterMasterRestart(MySqlReplicaController mySqlReplicaController, MySqlController mySqlController) throws SQLException, IOException, InterruptedException {
        this.connectorConfig.put("database.hostname", mySqlController.getDatabaseHostname());
        this.connectController.deployConnector(this.connectorConfig);
        insertCustomer(mySqlController, "Alex", "master", "amaster@test.com");
        String str = this.connectorConfig.getDbServerName() + ".inventory.customers";
        KafkaAssertions.awaitAssert(() -> {
            this.assertions.assertRecordsCount(str, 10);
        });
        KafkaAssertions.awaitAssert(() -> {
            this.assertions.assertRecordsContain(str, "amaster@test.com");
        });
        Awaitility.await().atMost(WaitConditions.scaled(5L), TimeUnit.MINUTES).pollInterval(Duration.ofSeconds(20L)).until(() -> {
            return Boolean.valueOf(Objects.equals(Integer.valueOf(getCustomerCount(mySqlReplicaController)), Integer.valueOf(getCustomerCount(mySqlController))));
        });
        mySqlController.reload();
        this.connectorConfig.put("database.hostname", mySqlReplicaController.getDatabaseHostname());
        this.connectController.deployConnector(this.connectorConfig);
        insertCustomer(mySqlController, "Tom", "Train", "ttrain@test.com");
        KafkaAssertions.awaitAssert(() -> {
            this.assertions.assertRecordsCount(str, 11);
        });
        KafkaAssertions.awaitAssert(() -> {
            this.assertions.assertRecordsContain(str, "ttrain@test.com");
        });
    }
}
