package io.debezium.server.redis;

import io.debezium.connector.postgresql.connection.PostgresConnection;
import io.debezium.doc.FixFor;
import io.debezium.jdbc.JdbcConfiguration;
import io.debezium.server.TestConfigSource;
import io.debezium.testing.testcontainers.PostgresTestResourceLifecycleManager;
import io.debezium.util.Testing;
import io.quarkus.test.common.QuarkusTestResource;
import io.quarkus.test.junit.QuarkusIntegrationTest;
import io.quarkus.test.junit.TestProfile;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.Map;
import org.awaitility.Awaitility;
import org.junit.Assert;
import org.junit.jupiter.api.Test;
import redis.clients.jedis.HostAndPort;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.StreamEntryID;
import redis.clients.jedis.resps.StreamEntry;

@TestProfile(RedisStreamTestProfile.class)
@QuarkusIntegrationTest
@QuarkusTestResource(RedisTestResourceLifecycleManager.class)
/* loaded from: input_file:io/debezium/server/redis/RedisStreamIT.class */
public class RedisStreamIT {
    private PostgresConnection getPostgresConnection() {
        return new PostgresConnection(JdbcConfiguration.create().with("user", "postgres").with("password", "postgres").with("dbname", "postgres").with("hostname", "localhost").with("port", PostgresTestResourceLifecycleManager.getContainer().getMappedPort(PostgresTestResourceLifecycleManager.POSTGRES_PORT.intValue())).build(), "Debezium Redis Test");
    }

    private Long getStreamLength(Jedis jedis, String str, int i) {
        Awaitility.await().atMost(Duration.ofSeconds(TestConfigSource.waitForSeconds())).until(() -> {
            return Boolean.valueOf(jedis.xlen(str) == ((long) i));
        });
        return Long.valueOf(jedis.xlen(str));
    }

    @Test
    public void testRedisStream() throws Exception {
        Jedis jedis = new Jedis(HostAndPort.from(RedisTestResourceLifecycleManager.getRedisContainerAddress()));
        ArrayList arrayList = new ArrayList();
        Awaitility.await().atMost(Duration.ofSeconds(TestConfigSource.waitForSeconds())).until(() -> {
            arrayList.addAll(jedis.xrange("testc.inventory.customers", (StreamEntryID) null, (StreamEntryID) null, 4));
            return Boolean.valueOf(arrayList.size() == 4);
        });
        Assert.assertTrue("Expected stream length of 4", Long.valueOf(jedis.xlen("testc.inventory.customers")).longValue() == 4);
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            Map fields = ((StreamEntry) it.next()).getFields();
            Assert.assertTrue("Expected map of size 1", fields.size() == 1);
            Map.Entry entry = (Map.Entry) fields.entrySet().iterator().next();
            Assert.assertTrue("Expected json like key starting with {\"schema\":...", ((String) entry.getKey()).startsWith("{\"schema\":"));
            Assert.assertTrue("Expected json like value starting with {\"schema\":...", ((String) entry.getValue()).startsWith("{\"schema\":"));
        }
        jedis.close();
    }

    @FixFor({"DBZ-4510"})
    @Test
    public void testRedisConnectionRetry() throws Exception {
        Testing.Print.enable();
        Jedis jedis = new Jedis(HostAndPort.from(RedisTestResourceLifecycleManager.getRedisContainerAddress()));
        Testing.print("Pausing container");
        RedisTestResourceLifecycleManager.pause();
        PostgresConnection postgresConnection = getPostgresConnection();
        Testing.print("Creating new redis_test table and inserting 5 records to it");
        postgresConnection.execute(new String[]{"CREATE TABLE inventory.redis_test (id INT PRIMARY KEY)", "INSERT INTO inventory.redis_test VALUES (1)", "INSERT INTO inventory.redis_test VALUES (2)", "INSERT INTO inventory.redis_test VALUES (3)", "INSERT INTO inventory.redis_test VALUES (4)", "INSERT INTO inventory.redis_test VALUES (5)"});
        postgresConnection.close();
        Testing.print("Sleeping for 3 seconds to simulate no connection errors");
        Thread.sleep(3000L);
        Testing.print("Unpausing container");
        RedisTestResourceLifecycleManager.unpause();
        Thread.sleep(2000L);
        Long valueOf = Long.valueOf(jedis.xlen("testc.inventory.redis_test"));
        Testing.print("Entries in testc.inventory.redis_test:" + valueOf);
        jedis.close();
        Assert.assertTrue("Redis Connection Test Failed", valueOf.longValue() == 5);
    }

    @FixFor({"DBZ-4510"})
    @Test
    public void testRedisOOMRetry() throws Exception {
        Testing.Print.enable();
        Jedis jedis = new Jedis(HostAndPort.from(RedisTestResourceLifecycleManager.getRedisContainerAddress()));
        Testing.print("Setting Redis' maxmemory to 1M");
        jedis.configSet("maxmemory", "1M");
        PostgresConnection postgresConnection = getPostgresConnection();
        postgresConnection.execute(new String[]{"CREATE TABLE inventory.redis_test2 (id VARCHAR(100) PRIMARY KEY, first_name VARCHAR(100), last_name VARCHAR(100))"});
        postgresConnection.execute(new String[]{String.format("INSERT INTO inventory.redis_test2 (id,first_name,last_name) SELECT LEFT(i::text, 10), RANDOM()::text, RANDOM()::text FROM generate_series(1,%d) s(i)", 50)});
        postgresConnection.commit();
        Thread.sleep(1000L);
        Testing.print("Entries in testc.inventory.redis_test2:" + jedis.xlen("testc.inventory.redis_test2"));
        Assert.assertTrue(jedis.xlen("testc.inventory.redis_test2") < 50);
        Thread.sleep(1000L);
        jedis.configSet("maxmemory", "0");
        Assert.assertTrue("Redis OOM Test Failed", getStreamLength(jedis, "testc.inventory.redis_test2", 50).longValue() == 50);
    }
}
