package io.debezium.server.rabbitmq;

import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.stream.Consumer;
import com.rabbitmq.stream.Environment;
import com.rabbitmq.stream.OffsetSpecification;
import io.debezium.connector.postgresql.connection.PostgresConnection;
import io.debezium.jdbc.JdbcConfiguration;
import io.debezium.server.TestConfigSource;
import io.debezium.server.events.ConnectorCompletedEvent;
import io.debezium.server.events.ConnectorStartedEvent;
import io.debezium.testing.testcontainers.PostgresTestResourceLifecycleManager;
import io.debezium.util.Testing;
import io.quarkus.test.common.QuarkusTestResource;
import io.quarkus.test.junit.QuarkusTest;
import jakarta.enterprise.event.Observes;
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeoutException;
import org.assertj.core.api.Assertions;
import org.awaitility.Awaitility;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.condition.EnabledIfSystemProperty;

@QuarkusTest
@EnabledIfSystemProperty(named = "debezium.sink.type", matches = "rabbitmqstream")
@QuarkusTestResource.List({@QuarkusTestResource(PostgresTestResourceLifecycleManager.class), @QuarkusTestResource(RabbitMqStreamTestResourceLifecycleManager.class)})
/* loaded from: input_file:io/debezium/server/rabbitmq/RabbitMqStreamIT.class */
public class RabbitMqStreamIT {
    private static final int MESSAGE_COUNT = 4;
    private static Environment environment;

    @ConfigProperty(name = "debezium.source.database.hostname")
    String dbHostname;

    @ConfigProperty(name = "debezium.source.database.port")
    String dbPort;

    @ConfigProperty(name = "debezium.source.database.user")
    String dbUser;

    @ConfigProperty(name = "debezium.source.database.password")
    String dbPassword;

    @ConfigProperty(name = "debezium.source.database.dbname")
    String dbName;
    private static Consumer consumer = null;
    private static final List<String> messages = Collections.synchronizedList(new ArrayList());

    public RabbitMqStreamIT() {
        Testing.Files.delete(TestConfigSource.OFFSET_STORE_PATH);
        Testing.Files.createTestingFile(RabbitMqTestConfigSource.OFFSET_STORE_PATH);
    }

    void setupDependencies(@Observes ConnectorStartedEvent connectorStartedEvent) throws IOException, TimeoutException {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(RabbitMqStreamTestResourceLifecycleManager.container.getHost());
        connectionFactory.setPort(RabbitMqStreamTestResourceLifecycleManager.getPort());
        environment = Environment.builder().host(connectionFactory.getHost()).port(connectionFactory.getPort()).build();
        environment.streamCreator().stream(RabbitMqTestConfigSource.TOPIC_NAME).create();
        environment.consumerBuilder().stream(RabbitMqTestConfigSource.TOPIC_NAME).offset(OffsetSpecification.first()).messageHandler((context, message) -> {
            messages.add(new String(message.getBodyAsBinary()));
        }).build();
    }

    @AfterAll
    static void stop() throws IOException, TimeoutException {
        if (consumer != null) {
            consumer.close();
        }
        if (environment != null) {
            environment.close();
        }
    }

    void connectorCompleted(@Observes ConnectorCompletedEvent connectorCompletedEvent) throws Exception {
        if (!connectorCompletedEvent.isSuccess()) {
            throw new RuntimeException((Throwable) connectorCompletedEvent.getError().get());
        }
    }

    @Test
    public void testRabbitMqStream() throws Exception {
        Awaitility.await().atMost(Duration.ofSeconds(RabbitMqTestConfigSource.waitForSeconds())).until(() -> {
            return Boolean.valueOf(messages.size() >= MESSAGE_COUNT);
        });
        Assertions.assertThat(messages.size()).isGreaterThanOrEqualTo(MESSAGE_COUNT);
        messages.clear();
        PostgresConnection postgresConnection = new PostgresConnection(JdbcConfiguration.create().with("hostname", this.dbHostname).with("port", this.dbPort).with("user", this.dbUser).with("password", this.dbPassword).with("dbname", this.dbName).build(), "Debezium RabbitMQ Stream Test");
        try {
            postgresConnection.execute(new String[]{"INSERT INTO inventory.customers VALUES (10000, 'John', 'Doe', 'jdoe@example.org')", "DELETE FROM inventory.customers WHERE id=10000"});
            postgresConnection.close();
            Awaitility.await().atMost(Duration.ofSeconds(RabbitMqTestConfigSource.waitForSeconds())).until(() -> {
                return Boolean.valueOf(messages.size() >= 3);
            });
            Assertions.assertThat(messages.size()).isGreaterThanOrEqualTo(3);
            Assertions.assertThat(messages.get(2)).isEqualTo("default");
        } catch (Throwable th) {
            try {
                postgresConnection.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }
}
