package io.streamthoughts.azkarra.api.streams;

import io.streamthoughts.azkarra.api.annotations.VisibleForTesting;
import io.streamthoughts.azkarra.api.config.Conf;
import io.streamthoughts.azkarra.api.model.TimestampedValue;
import io.streamthoughts.azkarra.api.monad.Try;
import io.streamthoughts.azkarra.api.query.LocalStoreAccessor;
import io.streamthoughts.azkarra.api.streams.admin.AdminClientUtils;
import io.streamthoughts.azkarra.api.streams.topology.TopologyMetadata;
import io.streamthoughts.azkarra.api.time.Time;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.function.Predicate;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.TopologyDescription;
import org.apache.kafka.streams.processor.ThreadMetadata;
import org.apache.kafka.streams.state.QueryableStoreType;
import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
import org.apache.kafka.streams.state.ReadOnlySessionStore;
import org.apache.kafka.streams.state.ReadOnlyWindowStore;
import org.apache.kafka.streams.state.StreamsMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/streamthoughts/azkarra/api/streams/KafkaStreamsContainer.class */
public class KafkaStreamsContainer {
    private static final String INTERNAL_REPARTITIONING_TOPIC_SUFFIX = "-repartition";
    private final KafkaStreamsFactory streamsFactory;
    private KafkaStreams kafkaStreams;
    private final Conf conf;
    private volatile Throwable lastObservedException;
    private volatile TimestampedValue<State> state;
    private final TopologyMetadata topologyMetadata;
    private volatile Set<ThreadMetadata> threadMetadata;
    private final String applicationServer;
    private Executor executor;
    private static final Logger LOG = LoggerFactory.getLogger(KafkaStreamsContainer.class);
    private static final Pattern INTERNAL_REPARTITIONING_NAME_PATTERN = Pattern.compile(".*-[0-9]{10}-repartition$");
    private long started = -1;
    private final LinkedBlockingQueue<StateChangeWatcher> stateChangeWatchers = new LinkedBlockingQueue<>();

    /* loaded from: input_file:io/streamthoughts/azkarra/api/streams/KafkaStreamsContainer$StateChangeWatcher.class */
    private interface StateChangeWatcher {
        boolean accept(KafkaStreams.State state);

        void apply();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public KafkaStreamsContainer(TopologyMetadata topologyMetadata, Conf conf, KafkaStreamsFactory kafkaStreamsFactory) {
        Objects.requireNonNull(conf, "streamsConfig cannot be null");
        Objects.requireNonNull(kafkaStreamsFactory, "streamsFactory cannot be null");
        setState(State.NOT_CREATED);
        this.streamsFactory = kafkaStreamsFactory;
        this.topologyMetadata = topologyMetadata;
        this.conf = conf;
        this.applicationServer = this.conf.getOptionalString("application.server").orElse(null);
    }

    public synchronized Future<KafkaStreams.State> start(Executor executor, boolean z) {
        this.executor = executor;
        this.started = Time.SYSTEM.milliseconds();
        this.kafkaStreams = this.streamsFactory.make(this);
        return CompletableFuture.supplyAsync(() -> {
            if (z) {
                Set<String> sourceTopics = getSourceTopics(this.topologyMetadata.topology());
                if (!sourceTopics.isEmpty()) {
                    setState(State.WAITING_FOR_TOPICS);
                    try {
                        AdminClient newAdminClient = AdminClientUtils.newAdminClient(this.conf);
                        try {
                            LOG.info("Waiting for source topic(s) to be created: {}", sourceTopics);
                            AdminClientUtils.waitForTopicToExist(newAdminClient, sourceTopics);
                            if (newAdminClient != null) {
                                newAdminClient.close();
                            }
                        } finally {
                        }
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                    }
                }
            }
            this.kafkaStreams.start();
            return this.kafkaStreams.state();
        }, executor);
    }

    private void setState(State state) {
        this.state = new TimestampedValue<>(state);
    }

    @VisibleForTesting
    Set<String> getSourceTopics(TopologyDescription topologyDescription) {
        HashSet hashSet = new HashSet();
        topologyDescription.globalStores().forEach(globalStore -> {
            hashSet.addAll(globalStore.source().topicSet());
        });
        topologyDescription.subtopologies().forEach(subtopology -> {
            subtopology.nodes().forEach(node -> {
                if (node instanceof TopologyDescription.Source) {
                    hashSet.addAll((Collection) ((TopologyDescription.Source) node).topicSet().stream().filter(Predicate.not(this::isInternalTopics)).collect(Collectors.toSet()));
                }
            });
        });
        return hashSet;
    }

    private boolean isInternalTopics(String str) {
        return str.startsWith(applicationId()) || INTERNAL_REPARTITIONING_NAME_PATTERN.matcher(str).matches() || str.endsWith(INTERNAL_REPARTITIONING_TOPIC_SUFFIX);
    }

    public TimestampedValue<State> state() {
        return this.state;
    }

    public Optional<Serde> getDefaultKeySerde() {
        return !this.conf.hasPath("default.key.serde") ? Optional.empty() : Try.failable(() -> {
            return (Serde) this.conf.getClass("default.key.serde", Serde.class);
        }).toOptional();
    }

    public Set<ThreadMetadata> threadMetadata() {
        return this.threadMetadata;
    }

    public long startedSince() {
        return this.started;
    }

    public Conf streamsConfig() {
        return this.conf;
    }

    public String applicationId() {
        return this.conf.getString("application.id");
    }

    public Optional<Throwable> exception() {
        return Optional.ofNullable(this.lastObservedException);
    }

    public TopologyMetadata topologyMetadata() {
        return this.topologyMetadata;
    }

    public Map<MetricName, ? extends Metric> metrics() {
        return this.kafkaStreams.metrics();
    }

    public void close() {
        close(false);
    }

    public void close(boolean z) {
        this.kafkaStreams.close();
        if (z) {
            this.kafkaStreams.cleanUp();
        }
    }

    public void restart() {
        if (isNotRunning()) {
            restartNow();
        } else {
            this.stateChangeWatchers.add(new StateChangeWatcher() { // from class: io.streamthoughts.azkarra.api.streams.KafkaStreamsContainer.1
                @Override // io.streamthoughts.azkarra.api.streams.KafkaStreamsContainer.StateChangeWatcher
                public boolean accept(KafkaStreams.State state) {
                    return state == KafkaStreams.State.NOT_RUNNING;
                }

                @Override // io.streamthoughts.azkarra.api.streams.KafkaStreamsContainer.StateChangeWatcher
                public void apply() {
                    KafkaStreamsContainer.this.restartNow();
                }
            });
            close(false);
        }
    }

    private void restartNow() {
        ((CompletableFuture) start(this.executor, false)).handle((state, th) -> {
            if (th != null) {
                LOG.error("Unexpected error happens while restarting streams", th);
            }
            return state;
        });
    }

    public Optional<StreamsServerInfo> getLocalServerInfo() {
        return getAllMetadata().stream().filter((v0) -> {
            return v0.isLocal();
        }).findFirst();
    }

    public Set<StreamsServerInfo> getAllMetadata() {
        return isNotRunning() ? Collections.emptySet() : (Set) this.kafkaStreams.allMetadata().stream().map(this::newServerInfoFor).collect(Collectors.toSet());
    }

    public Collection<StreamsServerInfo> getAllMetadataForStore(String str) {
        Objects.requireNonNull(str, "storeName cannot be null");
        return (Collection) this.kafkaStreams.allMetadataForStore(str).stream().map(this::newServerInfoFor).collect(Collectors.toList());
    }

    public <K> StreamsServerInfo getMetadataForStoreAndKey(String str, K k, Serializer<K> serializer) {
        Objects.requireNonNull(str, "storeName cannot be null");
        Objects.requireNonNull(k, "key cannot be null");
        Objects.requireNonNull(serializer, "keySerializer cannot be null");
        return newServerInfoFor(this.kafkaStreams.metadataForKey(str, k, serializer));
    }

    public <K, V> LocalStoreAccessor<ReadOnlyKeyValueStore<K, V>> getLocalKeyValueStore(String str) {
        return getLocalStoreAccess(str, QueryableStoreTypes.keyValueStore());
    }

    public <K, V> LocalStoreAccessor<ReadOnlyWindowStore<K, V>> getLocalWindowStore(String str) {
        return getLocalStoreAccess(str, QueryableStoreTypes.windowStore());
    }

    public <K, V> LocalStoreAccessor<ReadOnlySessionStore<K, V>> getLocalSessionStore(String str) {
        return getLocalStoreAccess(str, QueryableStoreTypes.sessionStore());
    }

    public <T> LocalStoreAccessor<T> getLocalStoreAccess(String str, QueryableStoreType<T> queryableStoreType) {
        return new LocalStoreAccessor<>(() -> {
            return this.kafkaStreams.store(str, queryableStoreType);
        });
    }

    boolean isNotRunning() {
        State value = state().value();
        return (value == State.RUNNING || value == State.REBALANCING) ? false : true;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void stateChanges(long j, KafkaStreams.State state, KafkaStreams.State state2) {
        this.state = new TimestampedValue<>(j, State.valueOf(state.name()));
        if (state == KafkaStreams.State.RUNNING) {
            this.threadMetadata = this.kafkaStreams.localThreadsMetadata();
        } else {
            this.threadMetadata = Collections.emptySet();
        }
        if (this.stateChangeWatchers.isEmpty()) {
            return;
        }
        ArrayList<StateChangeWatcher> arrayList = new ArrayList(this.stateChangeWatchers.size());
        this.stateChangeWatchers.drainTo(arrayList);
        for (StateChangeWatcher stateChangeWatcher : arrayList) {
            if (stateChangeWatcher.accept(state)) {
                stateChangeWatcher.apply();
            } else {
                this.stateChangeWatchers.add(stateChangeWatcher);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void setException(Throwable th) {
        this.lastObservedException = th;
    }

    private StreamsServerInfo newServerInfoFor(StreamsMetadata streamsMetadata) {
        return new StreamsServerInfo(applicationId(), streamsMetadata.host(), streamsMetadata.port(), streamsMetadata.stateStoreNames(), groupByTopicThenGet(streamsMetadata.topicPartitions()), isLocal(streamsMetadata));
    }

    private boolean isLocal(StreamsMetadata streamsMetadata) {
        return (streamsMetadata.host() + ":" + streamsMetadata.port()).equals(this.applicationServer);
    }

    private static Set<TopicPartitions> groupByTopicThenGet(Set<TopicPartition> set) {
        return (Set) ((Map) set.stream().collect(Collectors.groupingBy((v0) -> {
            return v0.topic();
        }))).entrySet().stream().map(entry -> {
            return new TopicPartitions((String) entry.getKey(), (Set) ((List) entry.getValue()).stream().map((v0) -> {
                return v0.partition();
            }).collect(Collectors.toSet()));
        }).collect(Collectors.toSet());
    }
}
