package io.debezium.storage.jdbc;

import io.debezium.config.CommonConnectorConfig;
import io.debezium.config.Configuration;
import io.debezium.config.Field;
import io.debezium.connector.binlog.BinlogConnectorConfig;
import io.debezium.connector.mysql.MySqlConnector;
import io.debezium.connector.mysql.MySqlConnectorConfig;
import io.debezium.embedded.async.AbstractAsyncEngineConnectorTest;
import io.debezium.jdbc.JdbcConfiguration;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.junit.EqualityCheck;
import io.debezium.junit.SkipWhenDatabaseVersion;
import io.debezium.storage.jdbc.history.JdbcSchemaHistory;
import io.debezium.storage.jdbc.history.JdbcSchemaHistoryConfig;
import io.debezium.storage.jdbc.offset.JdbcOffsetBackingStoreConfig;
import io.debezium.util.Testing;
import java.io.File;
import java.io.IOException;
import java.nio.file.Path;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.time.Duration;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.wait.strategy.Wait;

@SkipWhenDatabaseVersion(check = EqualityCheck.LESS_THAN, major = 5, minor = 6, reason = "DDL uses fractional second data types, not supported until MySQL 5.6")
/* loaded from: input_file:io/debezium/storage/jdbc/JdbcOffsetBackingStoreIT.class */
public class JdbcOffsetBackingStoreIT extends AbstractAsyncEngineConnectorTest {
    private static final String USER = "debezium";
    private static final String PASSWORD = "dbz";
    private static final String ROOT_PASSWORD = "debezium";
    private static final String DBNAME = "inventory";
    private static final String TOPIC_PREFIX = "test";
    private static final String TABLE_NAME = "schematest";
    private static final Path SCHEMA_HISTORY_PATH = Testing.Files.createTestingPath("schema-history.db").toAbsolutePath();
    private static final Integer PORT = 3306;
    private static final String IMAGE = "quay.io/debezium/example-mysql";
    private static final String PRIVILEGED_USER = "mysqluser";
    private static final String PRIVILEGED_PASSWORD = "mysqlpassword";
    private static final GenericContainer<?> container = new GenericContainer(IMAGE).waitingFor(Wait.forLogMessage(".*mysqld: ready for connections.*", 2)).withEnv("MYSQL_ROOT_PASSWORD", "debezium").withEnv("MYSQL_USER", PRIVILEGED_USER).withEnv("MYSQL_PASSWORD", PRIVILEGED_PASSWORD).withExposedPorts(new Integer[]{PORT}).withStartupTimeout(Duration.ofSeconds(180));

    @BeforeClass
    public static void startDatabase() {
        container.start();
    }

    @AfterClass
    public static void stopDatabase() {
        container.stop();
    }

    @Before
    public void beforeEach() throws SQLException {
        initializeConnectorTestFramework();
        Testing.Files.delete(SCHEMA_HISTORY_PATH);
        JdbcConnection testConnection = testConnection();
        try {
            testConnection.execute(new String[]{"DROP TABLE IF EXISTS schematest", "CREATE TABLE schematest (id INT PRIMARY KEY, val VARCHAR(16))", "INSERT INTO schematest VALUES (1, 'one'), (2, 'two'), (3, 'three'), (4, 'four')"});
            if (testConnection != null) {
                testConnection.close();
            }
            stopConnector();
        } catch (Throwable th) {
            if (testConnection != null) {
                try {
                    testConnection.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @After
    public void afterEach() throws SQLException {
        try {
            stopConnector();
            Testing.Files.delete(SCHEMA_HISTORY_PATH);
            JdbcConnection testConnection = testConnection();
            try {
                testConnection.execute(new String[]{"DROP TABLE IF EXISTS schematest"});
                if (testConnection != null) {
                    testConnection.close();
                }
            } catch (Throwable th) {
                if (testConnection != null) {
                    try {
                        testConnection.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        } catch (Throwable th3) {
            Testing.Files.delete(SCHEMA_HISTORY_PATH);
            throw th3;
        }
    }

    protected Configuration.Builder schemaHistory(Configuration.Builder builder) {
        return builder.with("schema.history.internal." + JdbcSchemaHistoryConfig.PROP_JDBC_URL.name(), "jdbc:sqlite:" + String.valueOf(SCHEMA_HISTORY_PATH)).with("schema.history.internal." + JdbcSchemaHistoryConfig.PROP_USER.name(), "user").with("schema.history.internal." + JdbcSchemaHistoryConfig.PROP_PASSWORD.name(), "pass");
    }

    private Configuration.Builder config(String str) {
        return schemaHistory(Configuration.create().with(MySqlConnectorConfig.HOSTNAME, container.getHost()).with(MySqlConnectorConfig.PORT, container.getMappedPort(PORT.intValue())).with(MySqlConnectorConfig.USER, "debezium").with(MySqlConnectorConfig.PASSWORD, PASSWORD).with(MySqlConnectorConfig.DATABASE_INCLUDE_LIST, DBNAME).with(MySqlConnectorConfig.TABLE_INCLUDE_LIST, "inventory.schematest").with(MySqlConnectorConfig.SERVER_ID, 18765).with(MySqlConnectorConfig.POLL_INTERVAL_MS, 10).with(MySqlConnectorConfig.SCHEMA_HISTORY, JdbcSchemaHistory.class).with(CommonConnectorConfig.TOPIC_PREFIX, TOPIC_PREFIX).with(MySqlConnectorConfig.SNAPSHOT_MODE, BinlogConnectorConfig.SnapshotMode.INITIAL).with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, false).with("offset.storage." + JdbcOffsetBackingStoreConfig.PROP_JDBC_URL.name(), str).with("offset.storage." + JdbcOffsetBackingStoreConfig.PROP_USER.name(), "user").with("offset.storage." + JdbcOffsetBackingStoreConfig.PROP_PASSWORD.name(), "pass").with("offset.storage." + JdbcOffsetBackingStoreConfig.PROP_TABLE_NAME.name(), "offsets_jdbc").with("offset.storage." + JdbcOffsetBackingStoreConfig.PROP_TABLE_DDL.name(), "CREATE TABLE %s(id VARCHAR(36) NOT NULL, offset_key VARCHAR(1255), offset_val VARCHAR(1255),record_insert_ts TIMESTAMP NOT NULL,record_insert_seq INTEGER NOT NULL)").with("offset.storage." + JdbcOffsetBackingStoreConfig.PROP_TABLE_SELECT.name(), "SELECT id, offset_key, offset_val FROM %s ORDER BY record_insert_ts, record_insert_seq").with("offset.flush.interval.ms", "1000").with("offset.storage", "io.debezium.storage.jdbc.offset.JdbcOffsetBackingStore"));
    }

    private JdbcConnection testConnection() {
        return new JdbcConnection(JdbcConfiguration.create().withHostname(container.getHost()).withPort(container.getMappedPort(PORT.intValue()).intValue()).withUser(PRIVILEGED_USER).withPassword(PRIVILEGED_PASSWORD).withDatabase(DBNAME).build(), JdbcConnection.patternBasedFactory("jdbc:mysql://${hostname}:${port}/${dbname}", new Field[0]), "`", "`");
    }

    @Test
    public void shouldStartCorrectlyWithJdbcOffsetStorage() throws InterruptedException, IOException {
        if (!System.getProperty("database.port", "3306").equals(System.getProperty("database.replica.port", "3306"))) {
            Thread.sleep(5000L);
        }
        String format = String.format("jdbc:sqlite:%s", File.createTempFile("test-", "db").getAbsolutePath());
        start(MySqlConnector.class, config(format).build());
        waitForStreamingRunning("mysql", TOPIC_PREFIX);
        consumeRecordsByTopic(4);
        validateIfDataIsCreatedInJDBCDatabase(format, "user", "pass", "offsets_jdbc");
    }

    private void validateIfDataIsCreatedInJDBCDatabase(String str, String str2, String str3, String str4) {
        try {
            Statement createStatement = DriverManager.getConnection(str, str2, str3).createStatement();
            createStatement.setQueryTimeout(30);
            ResultSet executeQuery = createStatement.executeQuery(String.format("select * from %s", str4));
            while (executeQuery.next()) {
                String string = executeQuery.getString("offset_key");
                String string2 = executeQuery.getString("offset_val");
                String string3 = executeQuery.getString("record_insert_ts");
                String string4 = executeQuery.getString("record_insert_seq");
                Assert.assertFalse(string.isBlank() && string.isEmpty());
                Assert.assertFalse(string2.isBlank() && string2.isEmpty());
                Assert.assertFalse(string3.isBlank() && string3.isEmpty());
                Assert.assertFalse(string4.isBlank() && string4.isEmpty());
            }
        } catch (SQLException e) {
            throw new RuntimeException(e);
        }
    }
}
