package io.trino.testing.kafka;

import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.google.common.io.Closer;
import com.google.common.util.concurrent.Futures;
import io.airlift.log.Logger;
import io.trino.testing.ResourcePresence;
import java.io.Closeable;
import java.io.FileDescriptor;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.io.PrintStream;
import java.io.UncheckedIOException;
import java.nio.charset.Charset;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.stream.Stream;
import net.jodah.failsafe.Failsafe;
import net.jodah.failsafe.RetryPolicy;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.LongSerializer;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.containers.Network;
import org.testcontainers.lifecycle.Startable;
import org.testcontainers.utility.DockerImageName;
import org.testcontainers.utility.MountableFile;

/* loaded from: input_file:io/trino/testing/kafka/TestingKafka.class */
public final class TestingKafka implements Closeable {
    private static final String DEFAULT_CONFLUENT_PLATFORM_VERSION = "5.5.2";
    private static final int SCHEMA_REGISTRY_PORT = 8081;
    private final KafkaContainer kafka;
    private final GenericContainer<?> schemaRegistry;
    private final boolean withSchemaRegistry;
    private boolean stopped;
    private static final Logger log = Logger.get(TestingKafka.class);
    private static final DockerImageName KAFKA_IMAGE_NAME = DockerImageName.parse("confluentinc/cp-kafka");
    private static final DockerImageName SCHEMA_REGISTRY_IMAGE_NAME = DockerImageName.parse("confluentinc/cp-schema-registry");
    private final Closer closer = Closer.create();
    private final Network network = Network.newNetwork();

    public static TestingKafka create() {
        return create(DEFAULT_CONFLUENT_PLATFORM_VERSION);
    }

    public static TestingKafka create(String str) {
        return new TestingKafka(str, false);
    }

    public static TestingKafka createWithSchemaRegistry() {
        return new TestingKafka(DEFAULT_CONFLUENT_PLATFORM_VERSION, true);
    }

    private TestingKafka(String str, boolean z) {
        this.withSchemaRegistry = z;
        Closer closer = this.closer;
        Network network = this.network;
        Objects.requireNonNull(network);
        closer.register(network::close);
        MountableFile forClasspathResource = MountableFile.forClasspathResource("log4j-kafka.properties.template");
        MountableFile forClasspathResource2 = MountableFile.forClasspathResource("log4j-schema-registry.properties.template");
        this.kafka = new KafkaContainer(KAFKA_IMAGE_NAME.withTag(str)).withStartupAttempts(3).withNetwork(this.network).withNetworkAliases(new String[]{"kafka"}).withCopyFileToContainer(forClasspathResource, "/etc/confluent/docker/log4j.properties.template");
        this.schemaRegistry = new GenericContainer(SCHEMA_REGISTRY_IMAGE_NAME.withTag(str)).withStartupAttempts(3).withNetwork(this.network).withNetworkAliases(new String[]{"schema-registry"}).withEnv("SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS", "PLAINTEXT://kafka:9092").withEnv("SCHEMA_REGISTRY_HOST_NAME", "0.0.0.0").withEnv("SCHEMA_REGISTRY_LISTENERS", "http://0.0.0.0:8081").withEnv("SCHEMA_REGISTRY_HEAP_OPTS", "-Xmx1G").withExposedPorts(new Integer[]{Integer.valueOf(SCHEMA_REGISTRY_PORT)}).withCopyFileToContainer(forClasspathResource2, "/etc/confluent/docker/log4j.properties.template").dependsOn(new Startable[]{this.kafka});
        Closer closer2 = this.closer;
        KafkaContainer kafkaContainer = this.kafka;
        Objects.requireNonNull(kafkaContainer);
        closer2.register(kafkaContainer::stop);
        Closer closer3 = this.closer;
        GenericContainer<?> genericContainer = this.schemaRegistry;
        Objects.requireNonNull(genericContainer);
        closer3.register(genericContainer::stop);
        try {
            PrintStream printStream = new PrintStream((OutputStream) new FileOutputStream(FileDescriptor.out), true, Charset.defaultCharset().name());
            this.kafka.withLogConsumer(new PrintingLogConsumer(printStream, String.format("%-20s| ", "kafka")));
            this.schemaRegistry.withLogConsumer(new PrintingLogConsumer(printStream, String.format("%-20s| ", "schema-registry")));
        } catch (IOException e) {
            throw new UncheckedIOException(e);
        }
    }

    public void start() {
        Preconditions.checkState(!this.stopped, "Cannot start again");
        this.kafka.start();
        if (this.withSchemaRegistry) {
            this.schemaRegistry.start();
        }
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        this.closer.close();
        this.stopped = true;
    }

    @ResourcePresence
    public boolean isNotStopped() {
        return !this.stopped;
    }

    public void createTopic(String str) {
        createTopic(2, 1, str);
    }

    private void createTopic(int i, int i2, String str) {
        try {
            ArrayList arrayList = new ArrayList();
            arrayList.add("kafka-topics");
            arrayList.add("--partitions");
            arrayList.add(Integer.toString(i));
            arrayList.add("--replication-factor");
            arrayList.add(Integer.toString(i2));
            arrayList.add("--topic");
            arrayList.add(str);
            this.kafka.execInContainer((String[]) arrayList.toArray(new String[0]));
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    public void createTopicWithConfig(int i, int i2, String str, boolean z) {
        try {
            ArrayList arrayList = new ArrayList();
            arrayList.add("kafka-topics");
            arrayList.add("--create");
            arrayList.add("--topic");
            arrayList.add(str);
            arrayList.add("--partitions");
            arrayList.add(Integer.toString(i));
            arrayList.add("--replication-factor");
            arrayList.add(Integer.toString(i2));
            arrayList.add("--zookeeper");
            arrayList.add("localhost:2181");
            if (z) {
                arrayList.add("--config");
                arrayList.add("message.timestamp.type=LogAppendTime");
            }
            this.kafka.execInContainer((String[]) arrayList.toArray(new String[0]));
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    public <K, V> RecordMetadata sendMessages(Stream<ProducerRecord<K, V>> stream) {
        return sendMessages(stream, ImmutableMap.of());
    }

    public <K, V> RecordMetadata sendMessages(Stream<ProducerRecord<K, V>> stream, Map<String, String> map) {
        try {
            KafkaProducer<K, V> createProducer = createProducer(map);
            try {
                Future future = (Future) stream.map(producerRecord -> {
                    return send(createProducer, producerRecord);
                }).reduce((future2, future3) -> {
                    return future3;
                }).orElseGet(() -> {
                    return Futures.immediateFuture((Object) null);
                });
                createProducer.flush();
                RecordMetadata recordMetadata = (RecordMetadata) future.get();
                if (createProducer != null) {
                    createProducer.close();
                }
                return recordMetadata;
            } finally {
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new RuntimeException(e);
        } catch (ExecutionException e2) {
            throw new RuntimeException(e2);
        }
    }

    private <K, V> Future<RecordMetadata> send(KafkaProducer<K, V> kafkaProducer, ProducerRecord<K, V> producerRecord) {
        return (Future) Failsafe.with(new RetryPolicy[]{new RetryPolicy().onRetry(executionAttemptedEvent -> {
            log.warn(executionAttemptedEvent.getLastFailure(), "Retrying message send");
        }).withMaxAttempts(10).withBackoff(1L, 10000L, ChronoUnit.MILLIS)}).get(() -> {
            return kafkaProducer.send(producerRecord);
        });
    }

    public String getConnectString() {
        return this.kafka.getHost() + ":" + this.kafka.getMappedPort(9093);
    }

    private <K, V> KafkaProducer<K, V> createProducer(Map<String, String> map) {
        HashMap hashMap = new HashMap(map);
        hashMap.putIfAbsent("bootstrap.servers", getConnectString());
        hashMap.putIfAbsent("key.serializer", LongSerializer.class.getName());
        hashMap.putIfAbsent("value.serializer", JsonSerializer.class.getName());
        hashMap.putIfAbsent("partitioner.class", NumberPartitioner.class.getName());
        hashMap.putIfAbsent("acks", "1");
        return new KafkaProducer<>(toProperties(hashMap));
    }

    private static Properties toProperties(Map<String, String> map) {
        Properties properties = new Properties();
        for (Map.Entry<String, String> entry : map.entrySet()) {
            properties.setProperty(entry.getKey(), entry.getValue());
        }
        return properties;
    }

    public String getSchemaRegistryConnectString() {
        return "http://" + this.schemaRegistry.getHost() + ":" + this.schemaRegistry.getMappedPort(SCHEMA_REGISTRY_PORT);
    }

    public Network getNetwork() {
        return this.network;
    }
}
