package io.quarkus.kafka.streams.runtime;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.regex.Pattern;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.TopologyDescription;
import org.jboss.logging.Logger;

/* loaded from: input_file:io/quarkus/kafka/streams/runtime/KafkaStreamsTopologyManager.class */
public class KafkaStreamsTopologyManager {
    private static final Logger LOGGER = Logger.getLogger(KafkaStreamsTopologyManager.class.getName());
    private final Admin adminClient;
    private final List<String> sourceTopics;
    private final List<Pattern> sourcePatterns;
    private final Duration topicsTimeout;
    private volatile boolean closed = false;

    public KafkaStreamsTopologyManager(Admin admin, Topology topology, KafkaStreamsRuntimeConfig kafkaStreamsRuntimeConfig) {
        this.adminClient = admin;
        this.topicsTimeout = kafkaStreamsRuntimeConfig.topicsTimeout();
        if (!isTopicsCheckEnabled()) {
            LOGGER.infof("Kafka Streams will not wait for topics to be created", new Object[0]);
            this.sourceTopics = Collections.emptyList();
            this.sourcePatterns = Collections.emptyList();
            return;
        }
        if (kafkaStreamsRuntimeConfig.topics().isEmpty() && kafkaStreamsRuntimeConfig.topicPatterns().isEmpty()) {
            HashSet hashSet = new HashSet();
            HashSet hashSet2 = new HashSet();
            extractSources(topology, hashSet, hashSet2);
            this.sourceTopics = new ArrayList(hashSet);
            this.sourcePatterns = new ArrayList(hashSet2);
            LOGGER.infof("Kafka Streams will wait for topics: %s and topics matching patterns: %s to be created", this.sourceTopics, this.sourcePatterns);
        } else {
            this.sourceTopics = kafkaStreamsRuntimeConfig.topics().orElse(Collections.emptyList());
            this.sourcePatterns = kafkaStreamsRuntimeConfig.topicPatterns().orElse(Collections.emptyList()).stream().map(Pattern::compile).toList();
        }
        if (this.sourceTopics.isEmpty() && this.sourcePatterns.isEmpty()) {
            throw new IllegalArgumentException("No topics or topic patterns specified; cannot wait for topics to be created, in order to disable topics creation check set `quarkus.kafka-streams.topics-check-timeout=0`");
        }
    }

    public boolean isTopicsCheckEnabled() {
        return this.topicsTimeout.compareTo(Duration.ZERO) > 0;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void close() {
        this.closed = true;
    }

    public boolean isClosed() {
        return this.closed;
    }

    public static void extractSources(Topology topology, Set<String> set, Set<Pattern> set2) {
        HashSet hashSet = new HashSet();
        TopologyDescription describe = topology.describe();
        Iterator it = describe.globalStores().iterator();
        while (it.hasNext()) {
            TopologyDescription.Source source = ((TopologyDescription.GlobalStore) it.next()).source();
            if (source.topicPattern() != null) {
                set2.add(source.topicPattern());
            }
            if (source.topicSet() != null) {
                set.addAll(source.topicSet());
            }
        }
        Iterator it2 = describe.subtopologies().iterator();
        while (it2.hasNext()) {
            for (TopologyDescription.Source source2 : ((TopologyDescription.Subtopology) it2.next()).nodes()) {
                if (source2 instanceof TopologyDescription.Sink) {
                    TopologyDescription.Sink sink = (TopologyDescription.Sink) source2;
                    if (sink.topic() != null) {
                        hashSet.add(sink.topic());
                    }
                } else if (source2 instanceof TopologyDescription.Source) {
                    TopologyDescription.Source source3 = source2;
                    if (source3.topicPattern() != null) {
                        set2.add(source3.topicPattern());
                    }
                    if (source3.topicSet() != null) {
                        set.addAll(source3.topicSet());
                    }
                }
            }
        }
        set.removeAll(hashSet);
    }

    public List<String> getSourceTopics() {
        return this.sourceTopics;
    }

    public List<Pattern> getSourcePatterns() {
        return this.sourcePatterns;
    }

    public Set<String> getMissingTopics() throws InterruptedException {
        if (!isTopicsCheckEnabled()) {
            return Collections.emptySet();
        }
        LinkedHashSet linkedHashSet = new LinkedHashSet(this.sourceTopics);
        try {
            Set set = (Set) this.adminClient.listTopics().names().get(this.topicsTimeout.toMillis(), TimeUnit.MILLISECONDS);
            linkedHashSet.removeAll(set);
            linkedHashSet.addAll(this.sourcePatterns.stream().filter(pattern -> {
                return set.stream().noneMatch(pattern.asPredicate());
            }).map((v0) -> {
                return v0.pattern();
            }).toList());
        } catch (ExecutionException | TimeoutException e) {
            LOGGER.error("Failed to get topic names from broker", e);
        }
        return linkedHashSet;
    }

    public void waitForTopicsToBeCreated() throws InterruptedException {
        Set set;
        if (isTopicsCheckEnabled()) {
            Object obj = null;
            while (!this.closed) {
                try {
                    try {
                        set = (Set) this.adminClient.listTopics().names().get(this.topicsTimeout.toMillis(), TimeUnit.MILLISECONDS);
                    } catch (ExecutionException | TimeoutException e) {
                        LOGGER.error("Failed to get topic names from broker", e);
                        Thread.sleep(1000L);
                    }
                    if (set.containsAll(this.sourceTopics) && this.sourcePatterns.stream().allMatch(pattern -> {
                        return set.stream().anyMatch(pattern.asPredicate());
                    })) {
                        LOGGER.debugf("All expected topics %s and topics matching patterns %s ", this.sourceTopics, this.sourcePatterns);
                        Thread.sleep(1000L);
                        return;
                    }
                    HashSet hashSet = new HashSet(this.sourceTopics);
                    hashSet.removeAll(set);
                    if (hashSet.equals(obj)) {
                        LOGGER.debug("Waiting for topic(s) to be created: " + String.valueOf(hashSet));
                    } else {
                        LOGGER.warn("Waiting for topic(s) to be created: " + String.valueOf(hashSet));
                        obj = hashSet;
                    }
                    Thread.sleep(1000L);
                } catch (Throwable th) {
                    Thread.sleep(1000L);
                    throw th;
                }
            }
        }
    }
}
