package org.creekservice.internal.kafka.streams.test.extension.testsuite;

import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.creekservice.api.base.type.Preconditions;
import org.creekservice.api.platform.metadata.ServiceDescriptor;
import org.creekservice.api.system.test.extension.CreekSystemTest;
import org.creekservice.api.system.test.extension.test.env.listener.TestEnvironmentListener;
import org.creekservice.api.system.test.extension.test.env.suite.service.ConfigurableServiceInstance;
import org.creekservice.api.system.test.extension.test.env.suite.service.ServiceInstance;
import org.creekservice.api.system.test.extension.test.model.CreekTestSuite;
import org.creekservice.api.system.test.extension.test.model.TestSuiteResult;
import org.creekservice.internal.kafka.extension.resource.TopicCollector;
import org.creekservice.internal.kafka.streams.test.extension.ClusterEndpointsProvider;
import org.creekservice.internal.kafka.streams.test.extension.handler.TestOptionsAccessor;
import org.creekservice.internal.kafka.streams.test.extension.model.KafkaOptions;

/* loaded from: input_file:org/creekservice/internal/kafka/streams/test/extension/testsuite/StartKafkaTestListener.class */
public final class StartKafkaTestListener implements TestEnvironmentListener {
    private final CreekSystemTest api;
    private final TopicCollector topicCollector;
    private final ClusterEndpointsProvider clusterEndpointsProvider;
    private final Map<String, ServiceInstance> kafkaInstances;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/creekservice/internal/kafka/streams/test/extension/testsuite/StartKafkaTestListener$ClusterInstance.class */
    public static final class ClusterInstance {
        private final String clusterName;
        private final ConfigurableServiceInstance service;

        ClusterInstance(String str, ConfigurableServiceInstance configurableServiceInstance) {
            this.clusterName = Preconditions.requireNonBlank(str, "cluster");
            this.service = (ConfigurableServiceInstance) Objects.requireNonNull(configurableServiceInstance, "service");
        }

        public String clusterName() {
            return this.clusterName;
        }

        public ConfigurableServiceInstance service() {
            return this.service;
        }
    }

    public StartKafkaTestListener(CreekSystemTest creekSystemTest, ClusterEndpointsProvider clusterEndpointsProvider) {
        this(creekSystemTest, clusterEndpointsProvider, new TopicCollector());
    }

    StartKafkaTestListener(CreekSystemTest creekSystemTest, ClusterEndpointsProvider clusterEndpointsProvider, TopicCollector topicCollector) {
        this.kafkaInstances = new HashMap();
        this.api = (CreekSystemTest) Objects.requireNonNull(creekSystemTest, "api");
        this.topicCollector = (TopicCollector) Objects.requireNonNull(topicCollector, "topicCollector");
        this.clusterEndpointsProvider = (ClusterEndpointsProvider) Objects.requireNonNull(clusterEndpointsProvider, "clusterEndpointsProvider");
    }

    public void beforeSuite(CreekTestSuite creekTestSuite) {
        KafkaOptions kafkaOptions = TestOptionsAccessor.get(creekTestSuite);
        ((Map) this.api.tests().env().currentSuite().services().stream().flatMap(this::requiredClusters).collect(Collectors.groupingBy((v0) -> {
            return v0.clusterName();
        }, Collectors.mapping((v0) -> {
            return v0.service();
        }, Collectors.toList())))).forEach((str, list) -> {
            createCluster(str, list, kafkaOptions);
        });
    }

    public void afterSuite(CreekTestSuite creekTestSuite, TestSuiteResult testSuiteResult) {
        this.kafkaInstances.keySet().forEach(str -> {
            this.clusterEndpointsProvider.put(str, Map.of());
        });
        this.kafkaInstances.values().forEach((v0) -> {
            v0.stop();
        });
        this.kafkaInstances.clear();
    }

    private Stream<ClusterInstance> requiredClusters(ConfigurableServiceInstance configurableServiceInstance) {
        return (Stream) configurableServiceInstance.descriptor().map(serviceDescriptor -> {
            return this.topicCollector.collectTopics(List.of(serviceDescriptor)).clusters().stream().map(str -> {
                return new ClusterInstance(str, configurableServiceInstance);
            });
        }).orElse(Stream.of((Object[]) new ClusterInstance[0]));
    }

    private void createCluster(String str, Collection<ConfigurableServiceInstance> collection, KafkaOptions kafkaOptions) {
        ServiceInstance add = this.api.tests().env().currentSuite().services().add(new KafkaContainerDef(str, kafkaOptions.kafkaDockerImage()));
        add.start();
        this.kafkaInstances.put(str, add);
        setEnv(collection, str, add);
        this.clusterEndpointsProvider.put(str, Map.of("bootstrap.servers", add.testNetworkHostname() + ":" + add.testNetworkPort(KafkaContainerDef.TEST_NETWORK_PORT)));
    }

    private void setEnv(Collection<ConfigurableServiceInstance> collection, String str, ServiceInstance serviceInstance) {
        String str2 = "KAFKA_" + str.toUpperCase() + "_";
        collection.forEach(configurableServiceInstance -> {
            configurableServiceInstance.addEnv(str2 + "BOOTSTRAP_SERVERS", serviceInstance.name() + ":9092");
        });
        collection.forEach(configurableServiceInstance2 -> {
            configurableServiceInstance2.addEnv(str2 + "APPLICATION_ID", ((ServiceDescriptor) configurableServiceInstance2.descriptor().orElseThrow()).name());
        });
    }
}
