package org.creekservice.api.kafka.serde.test;

import java.io.Closeable;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.creekservice.api.kafka.extension.ClientsExtensionOptions;
import org.creekservice.api.kafka.extension.KafkaClientsExtension;
import org.creekservice.api.kafka.extension.KafkaClientsExtensionOptions;
import org.creekservice.api.kafka.extension.resource.KafkaTopic;
import org.creekservice.api.kafka.metadata.topic.CreatableKafkaTopic;
import org.creekservice.api.kafka.metadata.topic.KafkaTopicDescriptor;
import org.creekservice.api.platform.metadata.ComponentInput;
import org.creekservice.api.platform.metadata.ComponentInternal;
import org.creekservice.api.platform.metadata.ComponentOutput;
import org.creekservice.api.platform.metadata.ServiceDescriptor;
import org.creekservice.api.service.context.CreekContext;
import org.creekservice.api.service.context.CreekServices;
import org.creekservice.api.service.extension.CreekExtensionOptions;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.containers.Network;
import org.testcontainers.utility.DockerImageName;

/* loaded from: input_file:org/creekservice/api/kafka/serde/test/KafkaSerdeProviderFunctionalFixture.class */
public final class KafkaSerdeProviderFunctionalFixture {
    private static final int CONTAINER_STARTUP_ATTEMPTS = 3;
    private final Map<String, KafkaContainer> brokerByCluster;
    private final List<CreatableKafkaTopic<?, ?>> topics;
    private static final DockerImageName KAFKA_IMAGE_NAME = DockerImageName.parse("confluentinc/cp-kafka:7.3.1");
    private static final Duration CONTAINER_STARTUP_TIMEOUT = Duration.ofSeconds(90);
    private final Network network = Network.newNetwork();
    private final List<CreekExtensionOptions> options = new ArrayList();
    private final List<Tester> testers = new ArrayList();

    /* loaded from: input_file:org/creekservice/api/kafka/serde/test/KafkaSerdeProviderFunctionalFixture$TestServiceDescriptor.class */
    private static final class TestServiceDescriptor implements ServiceDescriptor {
        private final List<CreatableKafkaTopic<?, ?>> topics;

        TestServiceDescriptor(Collection<? extends CreatableKafkaTopic<?, ?>> collection) {
            this.topics = List.copyOf(collection);
        }

        public String dockerImage() {
            return "not image - test descriptor";
        }

        public Collection<ComponentInput> inputs() {
            return topics(ComponentInput.class);
        }

        public Collection<ComponentInternal> internals() {
            return topics(ComponentInternal.class);
        }

        public Collection<ComponentOutput> outputs() {
            return topics(ComponentOutput.class);
        }

        private <T> Collection<T> topics(Class<T> cls) {
            Stream<CreatableKafkaTopic<?, ?>> stream = this.topics.stream();
            Objects.requireNonNull(cls);
            Stream<CreatableKafkaTopic<?, ?>> filter = stream.filter((v1) -> {
                return r1.isInstance(v1);
            });
            Objects.requireNonNull(cls);
            return (Collection) filter.map((v1) -> {
                return r1.cast(v1);
            }).collect(Collectors.toList());
        }
    }

    /* loaded from: input_file:org/creekservice/api/kafka/serde/test/KafkaSerdeProviderFunctionalFixture$Tester.class */
    public final class Tester implements Closeable {
        private final CreekContext creek;
        private final KafkaClientsExtension extension;
        private final AtomicBoolean closed = new AtomicBoolean(false);

        private Tester(Collection<? extends CreatableKafkaTopic<?, ?>> collection) {
            CreekServices.Builder with = CreekServices.builder(new TestServiceDescriptor(collection)).with(KafkaSerdeProviderFunctionalFixture.this.kafkaOptions());
            List<CreekExtensionOptions> list = KafkaSerdeProviderFunctionalFixture.this.options;
            Objects.requireNonNull(with);
            list.forEach(with::with);
            this.creek = with.build();
            this.extension = this.creek.extension(KafkaClientsExtension.class);
            KafkaSerdeProviderFunctionalFixture.this.testers.add(this);
        }

        public <K, V> KafkaTopic<K, V> topic(KafkaTopicDescriptor<K, V> kafkaTopicDescriptor) {
            return this.extension.topic(kafkaTopicDescriptor);
        }

        public <K, V> void produce(CreatableKafkaTopic<K, V> creatableKafkaTopic, K k, V v) {
            produceToTopic(topic(creatableKafkaTopic), k, v);
        }

        public <K, V> void consume(CreatableKafkaTopic<K, V> creatableKafkaTopic, K k, V v) {
            KafkaTopic<K, V> kafkaTopic = topic(creatableKafkaTopic);
            ConsumerRecord<byte[], byte[]> consumeFromTopic = consumeFromTopic(kafkaTopic);
            assertEqual(kafkaTopic.deserializeKey((byte[]) consumeFromTopic.key()), k);
            assertEqual(kafkaTopic.deserializeValue((byte[]) consumeFromTopic.value()), v);
        }

        public <K, V> void testProduceConsume(CreatableKafkaTopic<K, V> creatableKafkaTopic, K k, V v) {
            produce(creatableKafkaTopic, k, v);
            consume(creatableKafkaTopic, k, v);
        }

        public Tester testEvolution(CreatableKafkaTopic<?, ?> creatableKafkaTopic) {
            return new Tester(List.of(creatableKafkaTopic));
        }

        @Override // java.io.Closeable, java.lang.AutoCloseable
        public void close() {
            if (this.closed.compareAndSet(false, true)) {
                this.creek.close();
            }
        }

        private <V, K> void produceToTopic(KafkaTopic<K, V> kafkaTopic, K k, V v) {
            Producer producer = this.extension.producer(kafkaTopic.descriptor().cluster());
            producer.send(new ProducerRecord(kafkaTopic.name(), kafkaTopic.serializeKey(k), kafkaTopic.serializeValue(v)));
            producer.flush();
        }

        private ConsumerRecord<byte[], byte[]> consumeFromTopic(KafkaTopic<?, ?> kafkaTopic) {
            Consumer consumer = this.extension.consumer(kafkaTopic.descriptor().cluster());
            consumer.assign((List) IntStream.range(0, kafkaTopic.descriptor().config().partitions()).mapToObj(i -> {
                return new TopicPartition(kafkaTopic.name(), i);
            }).collect(Collectors.toList()));
            for (int i2 = 0; i2 != 30; i2++) {
                Iterator it = consumer.poll(Duration.ofSeconds(1L)).records(kafkaTopic.name()).iterator();
                if (it.hasNext()) {
                    return (ConsumerRecord) it.next();
                }
            }
            throw new AssertionError("Timed out waiting for record in " + kafkaTopic.name());
        }

        private <T> void assertEqual(T t, T t2) {
            if (!t2.equals(t)) {
                throw new AssertionError("Key mismatch." + System.lineSeparator() + "Expected: " + String.valueOf(t2) + System.lineSeparator() + "Got: " + String.valueOf(t));
            }
        }
    }

    public static KafkaSerdeProviderFunctionalFixture tester(Collection<? extends CreatableKafkaTopic<?, ?>> collection) {
        return new KafkaSerdeProviderFunctionalFixture(collection);
    }

    private KafkaSerdeProviderFunctionalFixture(Collection<? extends CreatableKafkaTopic<?, ?>> collection) {
        this.topics = List.copyOf((Collection) Objects.requireNonNull(collection, "topicDescriptors"));
        this.brokerByCluster = (Map) this.topics.stream().map((v0) -> {
            return v0.cluster();
        }).distinct().collect(Collectors.toMap(Function.identity(), str -> {
            return new KafkaContainer(KAFKA_IMAGE_NAME).withNetwork(this.network).withNetworkAliases(new String[]{str + "-kafka-broker"}).withStartupAttempts(CONTAINER_STARTUP_ATTEMPTS).withStartupTimeout(CONTAINER_STARTUP_TIMEOUT).withEnv("KAFKA_AUTO_CREATE_TOPICS_ENABLE", "false");
        }));
    }

    public KafkaSerdeProviderFunctionalFixture withExtensionOption(CreekExtensionOptions creekExtensionOptions) {
        if (creekExtensionOptions instanceof ClientsExtensionOptions) {
            throw new IllegalArgumentException("Client options are handled by the fixture");
        }
        this.options.add(creekExtensionOptions);
        return this;
    }

    public Tester start() {
        this.brokerByCluster.values().forEach((v0) -> {
            v0.start();
        });
        return new Tester(this.topics);
    }

    public void stop() {
        this.testers.forEach((v0) -> {
            v0.close();
        });
        this.testers.clear();
        this.brokerByCluster.values().forEach((v0) -> {
            v0.stop();
        });
        this.brokerByCluster.clear();
    }

    public KafkaContainer kafkaContainer(String str) {
        KafkaContainer kafkaContainer = this.brokerByCluster.get(str);
        if (kafkaContainer == null) {
            throw new IllegalArgumentException("Unknown cluster: " + str);
        }
        return kafkaContainer;
    }

    private KafkaClientsExtensionOptions kafkaOptions() {
        KafkaClientsExtensionOptions.Builder withKafkaProperty = KafkaClientsExtensionOptions.builder().withKafkaProperty("auto.offset.reset", "earliest").withKafkaProperty("group.id", UUID.randomUUID().toString());
        this.brokerByCluster.forEach((str, kafkaContainer) -> {
            withKafkaProperty.withKafkaProperty(str, "bootstrap.servers", kafkaContainer.getBootstrapServers());
        });
        return withKafkaProperty.build();
    }
}
