package io.streamthoughts.azkarra.api.streams.admin;

import io.streamthoughts.azkarra.api.config.Conf;
import io.streamthoughts.azkarra.api.time.SystemTime;
import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Objects;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.TopicListing;
import org.apache.kafka.common.KafkaFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/streamthoughts/azkarra/api/streams/admin/AdminClientUtils.class */
public class AdminClientUtils {
    private static final Logger LOG = LoggerFactory.getLogger(AdminClientUtils.class);

    public static AdminClient newAdminClient(Conf conf) {
        return AdminClient.create(getClientConfig(conf));
    }

    private static Properties getClientConfig(Conf conf) {
        Properties properties = new Properties();
        for (String str : AdminClientConfig.configNames()) {
            if (conf.hasPath(str)) {
                properties.put(str, conf.getString(str));
            }
        }
        return properties;
    }

    public static void waitForTopicToExist(AdminClient adminClient, Set<String> set) throws InterruptedException {
        Set<String> set2 = set;
        LOG.debug("Checking for topic(s) to be created: {}", set2);
        while (true) {
            set2 = checkTopicsMissing(adminClient, set2);
            if (set2.isEmpty()) {
                return;
            }
            LOG.debug("Waiting for topic(s) to be created: {}", set2);
            SystemTime.SYSTEM.sleep(Duration.ofSeconds(1L));
        }
    }

    public static CompletableFuture<Collection<TopicListing>> listTopics(AdminClient adminClient) {
        Objects.requireNonNull(adminClient, "client cannot be null");
        KafkaFuture listings = adminClient.listTopics().listings();
        return CompletableFuture.supplyAsync(() -> {
            try {
                return (Collection) listings.get();
            } catch (InterruptedException | ExecutionException e) {
                throw new RuntimeException(e);
            }
        });
    }

    private static Set<String> checkTopicsMissing(AdminClient adminClient, Set<String> set) throws InterruptedException {
        try {
            Set set2 = (Set) adminClient.listTopics().names().get(5L, TimeUnit.SECONDS);
            if (set2.containsAll(set)) {
                return Collections.emptySet();
            }
            HashSet hashSet = new HashSet(set);
            hashSet.removeAll(set2);
            return hashSet;
        } catch (ExecutionException | TimeoutException e) {
            LOG.error("Error while listing topics from broker: {}", e.getMessage());
            return set;
        }
    }
}
