package io.streamthoughts.azkarra.api.streams;

import io.streamthoughts.azkarra.api.config.Conf;
import io.streamthoughts.azkarra.api.model.TimestampedValue;
import io.streamthoughts.azkarra.api.monad.Try;
import io.streamthoughts.azkarra.api.query.LocalStoreAccessor;
import io.streamthoughts.azkarra.api.streams.consumer.ConsumerClientOffsets;
import io.streamthoughts.azkarra.api.streams.consumer.ConsumerGroupOffsets;
import io.streamthoughts.azkarra.api.streams.consumer.GlobalConsumerOffsetsRegistry;
import io.streamthoughts.azkarra.api.streams.consumer.LogOffsetsFetcher;
import io.streamthoughts.azkarra.api.streams.internal.InternalStreamsLifecycleContext;
import io.streamthoughts.azkarra.api.streams.topology.TopologyContainer;
import io.streamthoughts.azkarra.api.streams.topology.TopologyMetadata;
import io.streamthoughts.azkarra.api.time.Time;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.stream.Collectors;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.TopologyDescription;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.ThreadMetadata;
import org.apache.kafka.streams.state.QueryableStoreType;
import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
import org.apache.kafka.streams.state.ReadOnlySessionStore;
import org.apache.kafka.streams.state.ReadOnlyWindowStore;
import org.apache.kafka.streams.state.StreamsMetadata;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/streamthoughts/azkarra/api/streams/KafkaStreamsContainer.class */
public class KafkaStreamsContainer {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaStreamsContainer.class);
    private final KafkaStreamsFactory streamsFactory;
    private KafkaStreams kafkaStreams;
    private volatile Throwable lastObservedException;
    private volatile TimestampedValue<State> state;
    private final TopologyContainer topologyContainer;
    private final String applicationServer;
    private KafkaConsumer<byte[], byte[]> consumer;
    private Executor executor;
    private long started = -1;
    private volatile Set<ThreadMetadata> threadMetadata = Collections.emptySet();
    private final LinkedBlockingQueue<StateChangeWatcher> stateChangeWatchers = new LinkedBlockingQueue<>();

    /* loaded from: input_file:io/streamthoughts/azkarra/api/streams/KafkaStreamsContainer$StateChangeWatcher.class */
    public interface StateChangeWatcher {
        default boolean accept(State state) {
            return true;
        }

        void onChange(StateChangeEvent stateChangeEvent);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public KafkaStreamsContainer(TopologyContainer topologyContainer, KafkaStreamsFactory kafkaStreamsFactory) {
        Objects.requireNonNull(topologyContainer, "topologyContainer cannot be null");
        Objects.requireNonNull(kafkaStreamsFactory, "streamsFactory cannot be null");
        setState(State.NOT_CREATED);
        this.streamsFactory = kafkaStreamsFactory;
        this.topologyContainer = topologyContainer;
        this.applicationServer = streamsConfig().getOptionalString("application.server").orElse(null);
    }

    public synchronized Future<KafkaStreams.State> start(Executor executor) {
        this.executor = executor;
        this.started = Time.SYSTEM.milliseconds();
        this.kafkaStreams = this.streamsFactory.make(this.topologyContainer.topology(), this.topologyContainer.streamsConfig());
        reset();
        setState(State.CREATED);
        return CompletableFuture.supplyAsync(() -> {
            LOG.info("Initializing KafkaStreams container (application.id={})", applicationId());
            new InternalStreamsLifeCycleChain(this.topologyContainer.interceptors().iterator(), (streamsLifecycleInterceptor, streamsLifecycleChain) -> {
                streamsLifecycleInterceptor.onStart(new InternalStreamsLifecycleContext(this), streamsLifecycleChain);
            }, () -> {
                try {
                    LOG.info("Starting KafkaStreams (application.id={})", applicationId());
                    this.kafkaStreams.start();
                } catch (StreamsException e) {
                    this.lastObservedException = e;
                    throw e;
                }
            }).execute();
            KafkaStreams.State state = this.kafkaStreams.state();
            LOG.info("Completed KafkaStreams initialization (application.id={}, state={})", applicationId(), state);
            return state;
        }, executor);
    }

    private void reset() {
        this.lastObservedException = null;
    }

    public void setState(State state) {
        this.state = new TimestampedValue<>(state);
    }

    public TimestampedValue<State> state() {
        return this.state;
    }

    public Optional<Serde> getDefaultKeySerde() {
        return !streamsConfig().hasPath("default.key.serde") ? Optional.empty() : Try.failable(() -> {
            return (Serde) streamsConfig().getClass("default.key.serde", Serde.class);
        }).toOptional();
    }

    public Set<ThreadMetadata> threadMetadata() {
        return this.threadMetadata;
    }

    public long startedSince() {
        return this.started;
    }

    public Conf streamsConfig() {
        return this.topologyContainer.streamsConfig();
    }

    public String applicationId() {
        return streamsConfig().getString("application.id");
    }

    public String applicationServer() {
        return this.applicationServer;
    }

    public Optional<Throwable> exception() {
        return Optional.ofNullable(this.lastObservedException);
    }

    public TopologyMetadata topologyMetadata() {
        return this.topologyContainer.metadata();
    }

    public TopologyDescription topologyDescription() {
        return this.topologyContainer.description();
    }

    public Map<MetricName, ? extends Metric> metrics() {
        return this.kafkaStreams.metrics();
    }

    public ConsumerGroupOffsets offsets() {
        ConsumerGroupOffsets snapshot = GlobalConsumerOffsetsRegistry.getInstance().offsetsFor(applicationId()).snapshot();
        Set set = (Set) threadMetadata().stream().flatMap(threadMetadata -> {
            return threadMetadata.activeTasks().stream();
        }).flatMap(taskMetadata -> {
            return taskMetadata.topicPartitions().stream();
        }).collect(Collectors.toSet());
        Map<TopicPartition, Long> fetchLogEndOffsetsFor = LogOffsetsFetcher.fetchLogEndOffsetsFor(getConsumer(), set);
        Map<TopicPartition, Long> fetchLogStartOffsetsFor = LogOffsetsFetcher.fetchLogStartOffsetsFor(getConsumer(), set);
        return new ConsumerGroupOffsets(snapshot.group(), (Set) snapshot.consumers().stream().map(consumerClientOffsets -> {
            return new ConsumerClientOffsets(consumerClientOffsets.clientId(), consumerClientOffsets.streamThread(), (Set) consumerClientOffsets.positions().stream().map(consumerLogOffsets -> {
                if (set.contains(consumerLogOffsets.topicPartition())) {
                    return consumerLogOffsets.logEndOffset((Long) fetchLogEndOffsetsFor.get(consumerLogOffsets.topicPartition())).logStartOffset((Long) fetchLogStartOffsetsFor.get(consumerLogOffsets.topicPartition()));
                }
                return null;
            }).filter((v0) -> {
                return Objects.nonNull(v0);
            }).collect(Collectors.toSet()));
        }).collect(Collectors.toSet()));
    }

    public Producer<byte[], byte[]> getProducer(Map<String, Object> map) {
        String str = (String) map.get("client.id");
        if (str == null) {
            str = streamsConfig().getOptionalString("client.id").orElse(applicationId()) + "-" + UUID.randomUUID() + "-producer";
        }
        Map<String, Object> producerConfigs = getProducerConfigs(streamsConfig().getConfAsMap());
        producerConfigs.putAll(map);
        producerConfigs.put("bootstrap.servers", streamsConfig().getString("bootstrap.servers"));
        producerConfigs.put("client.id", str);
        return new KafkaProducer(producerConfigs, new ByteArraySerializer(), new ByteArraySerializer());
    }

    private synchronized Consumer<byte[], byte[]> getConsumer() {
        if (this.consumer == null) {
            String str = streamsConfig().getOptionalString("client.id").orElse(applicationId()) + "-" + UUID.randomUUID() + "-consumer";
            Map<String, Object> consumerConfigs = getConsumerConfigs(streamsConfig().getConfAsMap());
            consumerConfigs.put("bootstrap.servers", streamsConfig().getString("bootstrap.servers"));
            consumerConfigs.put("client.id", str);
            consumerConfigs.remove("group.id");
            this.consumer = new KafkaConsumer<>(consumerConfigs, new ByteArrayDeserializer(), new ByteArrayDeserializer());
        }
        return this.consumer;
    }

    public void close(Duration duration) {
        close(false, duration);
    }

    public void close(boolean z, Duration duration) {
        if (z) {
            reset();
        }
        Thread thread = new Thread(() -> {
            LOG.info("Closing KafkaStreams container (application.id={})", applicationId());
            new InternalStreamsLifeCycleChain(this.topologyContainer.interceptors().iterator(), (streamsLifecycleInterceptor, streamsLifecycleChain) -> {
                streamsLifecycleInterceptor.onStop(new InternalStreamsLifecycleContext(this), streamsLifecycleChain);
            }, () -> {
                this.kafkaStreams.close();
                if (z) {
                    LOG.info("Cleanup local states (application.id={})", applicationId());
                    this.kafkaStreams.cleanUp();
                }
                LOG.info("KafkaStreams closed completely (application.id={})", applicationId());
            }).execute();
        }, "kafka-streams-container-close-thread");
        thread.setDaemon(true);
        thread.start();
        long millis = duration.toMillis();
        if (millis > 0) {
            try {
                thread.join(millis);
            } catch (InterruptedException e) {
                LOG.debug("Cannot transit to {} within {}ms", KafkaStreams.State.NOT_RUNNING, Long.valueOf(millis));
            }
        }
    }

    public void restart() {
        if (isNotRunning()) {
            restartNow();
        } else {
            this.stateChangeWatchers.add(new StateChangeWatcher() { // from class: io.streamthoughts.azkarra.api.streams.KafkaStreamsContainer.1
                @Override // io.streamthoughts.azkarra.api.streams.KafkaStreamsContainer.StateChangeWatcher
                public boolean accept(State state) {
                    return state == State.NOT_RUNNING;
                }

                @Override // io.streamthoughts.azkarra.api.streams.KafkaStreamsContainer.StateChangeWatcher
                public void onChange(StateChangeEvent stateChangeEvent) {
                    KafkaStreamsContainer.this.restartNow();
                }
            });
            close(false, Duration.ZERO);
        }
    }

    private void restartNow() {
        ((CompletableFuture) start(this.executor)).handle((state, th) -> {
            if (th != null) {
                LOG.error("Unexpected error happens while restarting streams", th);
            }
            return state;
        });
    }

    public Optional<StreamsServerInfo> getLocalServerInfo() {
        return getAllMetadata().stream().filter((v0) -> {
            return v0.isLocal();
        }).findFirst();
    }

    public Set<StreamsServerInfo> getAllMetadata() {
        return isNotRunning() ? Collections.emptySet() : (Set) this.kafkaStreams.allMetadata().stream().map(this::newServerInfoFor).collect(Collectors.toSet());
    }

    public Collection<StreamsServerInfo> getAllMetadataForStore(String str) {
        Objects.requireNonNull(str, "storeName cannot be null");
        return (Collection) this.kafkaStreams.allMetadataForStore(str).stream().map(this::newServerInfoFor).collect(Collectors.toList());
    }

    public <K> Optional<StreamsServerInfo> getMetadataForStoreAndKey(String str, K k, Serializer<K> serializer) {
        Objects.requireNonNull(str, "storeName cannot be null");
        Objects.requireNonNull(k, "key cannot be null");
        Objects.requireNonNull(serializer, "keySerializer cannot be null");
        StreamsMetadata metadataForKey = this.kafkaStreams.metadataForKey(str, k, serializer);
        return (metadataForKey == null || metadataForKey.equals(StreamsMetadata.NOT_AVAILABLE)) ? Optional.empty() : Optional.of(newServerInfoFor(metadataForKey));
    }

    public <K, V> LocalStoreAccessor<ReadOnlyKeyValueStore<K, V>> getLocalKeyValueStore(String str) {
        return getLocalStoreAccess(str, QueryableStoreTypes.keyValueStore());
    }

    public <K, V> LocalStoreAccessor<ReadOnlyKeyValueStore<K, ValueAndTimestamp<V>>> getLocalTimestampedKeyValueStore(String str) {
        return getLocalStoreAccess(str, QueryableStoreTypes.timestampedKeyValueStore());
    }

    public <K, V> LocalStoreAccessor<ReadOnlyWindowStore<K, V>> getLocalWindowStore(String str) {
        return getLocalStoreAccess(str, QueryableStoreTypes.windowStore());
    }

    public <K, V> LocalStoreAccessor<ReadOnlyWindowStore<K, ValueAndTimestamp<V>>> getLocalTimestampedWindowStore(String str) {
        return getLocalStoreAccess(str, QueryableStoreTypes.timestampedWindowStore());
    }

    public <K, V> LocalStoreAccessor<ReadOnlySessionStore<K, V>> getLocalSessionStore(String str) {
        return getLocalStoreAccess(str, QueryableStoreTypes.sessionStore());
    }

    private <T> LocalStoreAccessor<T> getLocalStoreAccess(String str, QueryableStoreType<T> queryableStoreType) {
        return new LocalStoreAccessor<>(() -> {
            return this.kafkaStreams.store(str, queryableStoreType);
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Logger logger() {
        return LOG;
    }

    public boolean isNotRunning() {
        KafkaStreams.State state = this.kafkaStreams.state();
        return (state.equals(KafkaStreams.State.RUNNING) || state.equals(KafkaStreams.State.REBALANCING)) ? false : true;
    }

    public void addStateChangeWatcher(StateChangeWatcher stateChangeWatcher) {
        this.stateChangeWatchers.add((StateChangeWatcher) Objects.requireNonNull(stateChangeWatcher, "Cannot register null watcher"));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void stateChanges(StateChangeEvent stateChangeEvent) {
        this.state = new TimestampedValue<>(stateChangeEvent.timestamp(), stateChangeEvent.newState());
        if (this.state.value() == State.RUNNING) {
            this.threadMetadata = this.kafkaStreams.localThreadsMetadata();
        } else {
            this.threadMetadata = Collections.emptySet();
        }
        if (this.stateChangeWatchers.isEmpty()) {
            return;
        }
        ArrayList<StateChangeWatcher> arrayList = new ArrayList(this.stateChangeWatchers.size());
        this.stateChangeWatchers.drainTo(arrayList);
        for (StateChangeWatcher stateChangeWatcher : arrayList) {
            if (stateChangeWatcher.accept(stateChangeEvent.newState())) {
                stateChangeWatcher.onChange(stateChangeEvent);
            } else {
                this.stateChangeWatchers.add(stateChangeWatcher);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void setException(Throwable th) {
        this.lastObservedException = th;
    }

    private StreamsServerInfo newServerInfoFor(StreamsMetadata streamsMetadata) {
        return new StreamsServerInfo(applicationId(), streamsMetadata.host(), streamsMetadata.port(), streamsMetadata.stateStoreNames(), groupByTopicThenGet(streamsMetadata.topicPartitions()), isLocal(streamsMetadata));
    }

    private boolean isLocal(StreamsMetadata streamsMetadata) {
        return (streamsMetadata.host() + ":" + streamsMetadata.port()).equals(this.applicationServer);
    }

    private static Set<TopicPartitions> groupByTopicThenGet(Set<TopicPartition> set) {
        return (Set) ((Map) set.stream().collect(Collectors.groupingBy((v0) -> {
            return v0.topic();
        }))).entrySet().stream().map(entry -> {
            return new TopicPartitions((String) entry.getKey(), (Set) ((List) entry.getValue()).stream().map((v0) -> {
                return v0.partition();
            }).collect(Collectors.toSet()));
        }).collect(Collectors.toSet());
    }

    private static Map<String, Object> getConsumerConfigs(Map<String, Object> map) {
        HashMap hashMap = new HashMap();
        for (String str : ConsumerConfig.configNames()) {
            if (map.containsKey(str)) {
                hashMap.put(str, map.get(str));
            }
        }
        return hashMap;
    }

    private static Map<String, Object> getProducerConfigs(Map<String, Object> map) {
        HashMap hashMap = new HashMap();
        for (String str : ProducerConfig.configNames()) {
            if (map.containsKey(str)) {
                hashMap.put(str, map.get(str));
            }
        }
        return hashMap;
    }

    public KafkaStreams getKafkaStreams() {
        if (this.kafkaStreams == null) {
            throw new IllegalStateException("Cannot get access to KafkaStreams, instance is not created yet.");
        }
        return this.kafkaStreams;
    }
}
