package io.trino.testing.kafka;

import com.google.common.collect.ImmutableMap;
import com.google.common.io.Closer;
import io.confluent.kafka.serializers.KafkaAvroSerializer;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.common.serialization.LongSerializer;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.Network;

/* loaded from: input_file:io/trino/testing/kafka/TestingKafkaWithSchemaRegistry.class */
public class TestingKafkaWithSchemaRegistry implements TestingKafka {
    private static final int SCHEMA_REGISTRY_PORT = 8081;
    private final TestingKafka delegate;
    private final GenericContainer<?> schemaRegistryContainer;
    private final Closer closer;

    public TestingKafkaWithSchemaRegistry() {
        this(TestingKafka.DEFAULT_CONFLUENT_PLATFORM_VERSION);
    }

    public TestingKafkaWithSchemaRegistry(String str) {
        this.closer = Closer.create();
        Objects.requireNonNull(str, "confluentPlatformVersion is null");
        this.delegate = new BasicTestingKafka(str);
        this.schemaRegistryContainer = new GenericContainer("confluentinc/cp-schema-registry:" + str).withNetwork(Network.SHARED).withEnv("SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS", "PLAINTEXT://kafka:9092").withEnv("SCHEMA_REGISTRY_HOST_NAME", "0.0.0.0").withEnv("SCHEMA_REGISTRY_LISTENERS", String.format("http://0.0.0.0:%s", Integer.valueOf(SCHEMA_REGISTRY_PORT))).withExposedPorts(new Integer[]{Integer.valueOf(SCHEMA_REGISTRY_PORT)});
        this.closer.register(this.delegate);
        Closer closer = this.closer;
        GenericContainer<?> genericContainer = this.schemaRegistryContainer;
        Objects.requireNonNull(genericContainer);
        closer.register(genericContainer::stop);
    }

    @Override // io.trino.testing.kafka.TestingKafka
    public void start() {
        this.delegate.start();
        this.schemaRegistryContainer.start();
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        this.closer.close();
    }

    @Override // io.trino.testing.kafka.TestingKafka
    public void createTopic(String str) {
        this.delegate.createTopic(str);
    }

    @Override // io.trino.testing.kafka.TestingKafka
    public void createTopicWithConfig(int i, int i2, String str, boolean z) {
        this.delegate.createTopicWithConfig(i, i2, str, z);
    }

    @Override // io.trino.testing.kafka.TestingKafka
    public String getConnectString() {
        return this.delegate.getConnectString();
    }

    @Override // io.trino.testing.kafka.TestingKafka
    public <K, V> KafkaProducer<K, V> createProducer(Map<String, String> map) {
        Properties properties = new Properties();
        properties.put("schema.registry.url", getSchemaRegistryConnectString());
        properties.put("bootstrap.servers", this.delegate.getConnectString());
        properties.put("key.serializer", LongSerializer.class.getName());
        properties.put("value.serializer", KafkaAvroSerializer.class.getName());
        properties.putAll(map);
        return new KafkaProducer<>(properties);
    }

    public String getSchemaRegistryConnectString() {
        return "http://" + this.schemaRegistryContainer.getContainerIpAddress() + ":" + this.schemaRegistryContainer.getMappedPort(SCHEMA_REGISTRY_PORT);
    }

    public <T> KafkaProducer<T, GenericRecord> createConfluentProducer() {
        return createConfluentProducer(ImmutableMap.of());
    }

    public <T> KafkaProducer<T, GenericRecord> createConfluentProducer(Map<String, String> map) {
        HashMap hashMap = new HashMap(map);
        hashMap.putIfAbsent("key.serializer", KafkaAvroSerializer.class.getName());
        return createProducer(hashMap);
    }
}
