package io.trino.plugin.eventlistener.kafka;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import io.trino.spi.eventlistener.EventListener;
import io.trino.spi.eventlistener.EventListenerFactory;
import io.trino.testing.kafka.TestingKafka;
import java.io.IOException;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:io/trino/plugin/eventlistener/kafka/TestKafkaEventListenerPlugin.class */
final class TestKafkaEventListenerPlugin {
    private static final String CREATED_TOPIC = "query_created";
    private static final String COMPLETED_TOPIC = "query_completed";
    private static final ObjectMapper MAPPER = new ObjectMapper();
    private static TestingKafka testingKafka;

    TestKafkaEventListenerPlugin() {
    }

    @BeforeAll
    public static void setUp() {
        testingKafka = TestingKafka.create();
        testingKafka.start();
        testingKafka.createTopic(CREATED_TOPIC);
        testingKafka.createTopic(COMPLETED_TOPIC);
    }

    @AfterAll
    public static void teardown() throws IOException {
        testingKafka.close();
    }

    @Test
    void testEventListenerEndToEnd() throws JsonProcessingException {
        EventListener create = ((EventListenerFactory) Iterables.getOnlyElement(new KafkaEventListenerPlugin().getEventListenerFactories())).create(ImmutableMap.builder().put("kafka-event-listener.publish-created-event", "true").put("kafka-event-listener.publish-completed-event", "true").put("kafka-event-listener.broker-endpoints", testingKafka.getConnectString()).put("kafka-event-listener.created-event.topic", CREATED_TOPIC).put("kafka-event-listener.completed-event.topic", COMPLETED_TOPIC).put("kafka-event-listener.env-var-prefix", "INSIGHTS_").put("kafka-event-listener.request-timeout", "30s").put("kafka-event-listener.excluded-fields", "ioMetadata").put("kafka.security-protocol", "plaintext").buildOrThrow());
        try {
            create.queryCreated(TestUtils.queryCreatedEvent);
            JsonNode readTree = MAPPER.readTree((String) ((ConsumerRecord) Iterables.getOnlyElement(pollJsonRecords(CREATED_TOPIC))).value());
            Assertions.assertThat(readTree.get("eventMetadata")).isNotNull();
            JsonNode jsonNode = readTree.get("eventPayload");
            Assertions.assertThat(jsonNode).isNotNull();
            Assertions.assertThat(jsonNode.get("context")).isNotNull();
            Assertions.assertThat(jsonNode.get("metadata")).isNotNull();
            Assertions.assertThat(jsonNode.get("metadata").get("queryId").textValue()).isEqualTo(TestUtils.queryCreatedEvent.getMetadata().getQueryId());
            create.queryCompleted(TestUtils.queryCompletedEvent);
            JsonNode readTree2 = MAPPER.readTree((String) ((ConsumerRecord) Iterables.getOnlyElement(pollJsonRecords(COMPLETED_TOPIC))).value());
            Assertions.assertThat(readTree2.get("eventMetadata")).isNotNull();
            JsonNode jsonNode2 = readTree2.get("eventPayload");
            Assertions.assertThat(jsonNode2).isNotNull();
            Assertions.assertThat(jsonNode2.get("context")).isNotNull();
            Assertions.assertThat(jsonNode2.get("metadata")).isNotNull();
            Assertions.assertThat(jsonNode2.get("statistics")).isNotNull();
            Assertions.assertThat(jsonNode2.get("warnings")).isNotNull();
            Assertions.assertThat(jsonNode2.get("ioMetadata")).isNull();
            Assertions.assertThat(jsonNode2.get("metadata").get("queryId").textValue()).isEqualTo(TestUtils.queryCompletedEvent.getMetadata().getQueryId());
            create.shutdown();
        } catch (Throwable th) {
            create.shutdown();
            throw th;
        }
    }

    private ConsumerRecords<String, String> pollJsonRecords(String str) {
        KafkaConsumer<String, String> createConsumer = createConsumer();
        try {
            createConsumer.subscribe(ImmutableList.of(str));
            ConsumerRecords<String, String> poll = createConsumer.poll(Duration.of(5L, ChronoUnit.SECONDS));
            if (createConsumer != null) {
                createConsumer.close();
            }
            return poll;
        } catch (Throwable th) {
            if (createConsumer != null) {
                try {
                    createConsumer.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    private KafkaConsumer<String, String> createConsumer() {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", testingKafka.getConnectString());
        properties.put("key.deserializer", StringDeserializer.class.getName());
        properties.put("value.deserializer", StringDeserializer.class.getName());
        properties.put("group.id", "test-consumer");
        properties.put("auto.offset.reset", "earliest");
        return new KafkaConsumer<>(properties);
    }
}
