package io.debezium.server.rabbitmq;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import io.debezium.server.TestConfigSource;
import io.debezium.server.events.ConnectorCompletedEvent;
import io.debezium.server.events.ConnectorStartedEvent;
import io.debezium.testing.testcontainers.PostgresTestResourceLifecycleManager;
import io.debezium.util.Testing;
import io.quarkus.test.common.QuarkusTestResource;
import io.quarkus.test.junit.QuarkusTest;
import jakarta.enterprise.event.Observes;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeoutException;
import org.assertj.core.api.Assertions;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Test;

@QuarkusTest
@QuarkusTestResource.List({@QuarkusTestResource(PostgresTestResourceLifecycleManager.class), @QuarkusTestResource(RabbitMqTestResourceLifecycleManager.class)})
/* loaded from: input_file:io/debezium/server/rabbitmq/RabbitMqIT.class */
public class RabbitMqIT {
    private static final int MESSAGE_COUNT = 4;
    private static Connection connection;
    private static Channel channel = null;
    private static final List<String> messages = Collections.synchronizedList(new ArrayList());

    public RabbitMqIT() {
        Testing.Files.delete(TestConfigSource.OFFSET_STORE_PATH);
        Testing.Files.createTestingFile(RabbitMqTestConfigSource.OFFSET_STORE_PATH);
    }

    void setupDependencies(@Observes ConnectorStartedEvent connectorStartedEvent) throws IOException, TimeoutException {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(RabbitMqTestResourceLifecycleManager.container.getHost());
        connectionFactory.setPort(RabbitMqTestResourceLifecycleManager.getPort());
        connection = connectionFactory.newConnection();
        channel = connection.createChannel();
        channel.exchangeDeclare(RabbitMqTestConfigSource.TOPIC_NAME, BuiltinExchangeType.DIRECT);
        String queue = channel.queueDeclare().getQueue();
        channel.queueBind(queue, RabbitMqTestConfigSource.TOPIC_NAME, "");
        channel.basicConsume(queue, new DefaultConsumer(channel) { // from class: io.debezium.server.rabbitmq.RabbitMqIT.1
            public void handleDelivery(String str, Envelope envelope, AMQP.BasicProperties basicProperties, byte[] bArr) {
                RabbitMqIT.messages.add(new String(bArr, StandardCharsets.UTF_8));
            }
        });
    }

    @AfterAll
    static void stop() throws IOException, TimeoutException {
        if (channel != null) {
            channel.close();
        }
        if (connection != null) {
            connection.close();
        }
    }

    void connectorCompleted(@Observes ConnectorCompletedEvent connectorCompletedEvent) throws Exception {
        if (!connectorCompletedEvent.isSuccess()) {
            throw new RuntimeException((Throwable) connectorCompletedEvent.getError().get());
        }
    }

    @Test
    public void testRabbitMq() {
        Awaitility.await().atMost(Duration.ofSeconds(RabbitMqTestConfigSource.waitForSeconds())).until(() -> {
            return Boolean.valueOf(messages.size() >= MESSAGE_COUNT);
        });
        Assertions.assertThat(messages.size()).isGreaterThanOrEqualTo(MESSAGE_COUNT);
    }
}
