package io.streamthoughts.azkarra.api.streams;

import io.streamthoughts.azkarra.api.config.Conf;
import io.streamthoughts.azkarra.api.streams.listener.CompositeStateListener;
import io.streamthoughts.azkarra.api.streams.listener.CompositeStateRestoreListener;
import io.streamthoughts.azkarra.api.streams.listener.CompositeUncaughtExceptionHandler;
import io.streamthoughts.azkarra.api.streams.rocksdb.DefaultRocksDBConfigSetter;
import io.streamthoughts.azkarra.api.streams.topology.TopologyContainer;
import io.streamthoughts.azkarra.api.time.Time;
import java.lang.Thread;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.processor.StateRestoreListener;

/* loaded from: input_file:io/streamthoughts/azkarra/api/streams/KafkaStreamContainerBuilder.class */
public class KafkaStreamContainerBuilder {
    private TopologyContainer topologyContainer;
    private KafkaStreamsFactory kafkaStreamsFactory;
    private List<StateRestoreListener> restoreListeners = Collections.emptyList();
    private List<KafkaStreams.StateListener> stateListeners = Collections.emptyList();
    private List<Thread.UncaughtExceptionHandler> exceptionHandlers = Collections.emptyList();

    /* loaded from: input_file:io/streamthoughts/azkarra/api/streams/KafkaStreamContainerBuilder$InternalKafkaStreamsFactory.class */
    private class InternalKafkaStreamsFactory implements KafkaStreamsFactory {
        private final KafkaStreamsFactory factory;
        private KafkaStreamsContainer container;

        InternalKafkaStreamsFactory(KafkaStreamsFactory kafkaStreamsFactory) {
            this.factory = (KafkaStreamsFactory) Objects.requireNonNull(kafkaStreamsFactory, "factory cannot be null");
        }

        @Override // io.streamthoughts.azkarra.api.streams.KafkaStreamsFactory
        public KafkaStreams make(Topology topology, Conf conf) {
            KafkaStreams make = this.factory.make(topology, conf);
            CompositeStateListener compositeStateListener = new CompositeStateListener(KafkaStreamContainerBuilder.this.stateListeners);
            compositeStateListener.addListener((state, state2) -> {
                this.container.stateChanges(Time.SYSTEM.milliseconds(), state, state2);
            });
            CompositeUncaughtExceptionHandler compositeUncaughtExceptionHandler = new CompositeUncaughtExceptionHandler();
            compositeUncaughtExceptionHandler.addHandler((thread, th) -> {
                this.container.logger().error("Handling uncaught streams thread exception: {}", th.getMessage());
                this.container.setException(th);
            });
            if (KafkaStreamContainerBuilder.this.exceptionHandlers != null) {
                List<Thread.UncaughtExceptionHandler> list = KafkaStreamContainerBuilder.this.exceptionHandlers;
                Objects.requireNonNull(compositeUncaughtExceptionHandler);
                list.forEach(compositeUncaughtExceptionHandler::addHandler);
            }
            make.setStateListener(compositeStateListener);
            make.setUncaughtExceptionHandler(compositeUncaughtExceptionHandler);
            make.setGlobalStateRestoreListener(new CompositeStateRestoreListener(KafkaStreamContainerBuilder.this.restoreListeners));
            return make;
        }

        private void setContainer(KafkaStreamsContainer kafkaStreamsContainer) {
            this.container = kafkaStreamsContainer;
        }
    }

    public static KafkaStreamContainerBuilder newBuilder() {
        return new KafkaStreamContainerBuilder();
    }

    private KafkaStreamContainerBuilder() {
    }

    public KafkaStreamContainerBuilder withKafkaStreamsFactory(KafkaStreamsFactory kafkaStreamsFactory) {
        this.kafkaStreamsFactory = kafkaStreamsFactory;
        return this;
    }

    public KafkaStreamContainerBuilder withTopologyContainer(TopologyContainer topologyContainer) {
        this.topologyContainer = topologyContainer;
        return this;
    }

    public KafkaStreamContainerBuilder withRestoreListeners(List<StateRestoreListener> list) {
        this.restoreListeners = list;
        return this;
    }

    public KafkaStreamContainerBuilder withUncaughtExceptionHandler(List<Thread.UncaughtExceptionHandler> list) {
        this.exceptionHandlers = list;
        return this;
    }

    public KafkaStreamContainerBuilder withStateListeners(List<KafkaStreams.StateListener> list) {
        this.stateListeners = list;
        return this;
    }

    public KafkaStreamsContainer build() {
        Conf empty = Conf.empty();
        Conf streamsConfig = this.topologyContainer.streamsConfig();
        if (!streamsConfig.hasPath("rocksdb.config.setter")) {
            empty = empty.withFallback(Conf.with("rocksdb.config.setter", DefaultRocksDBConfigSetter.class.getName()));
        }
        this.topologyContainer.streamsConfig(streamsConfig.withFallback(empty));
        InternalKafkaStreamsFactory internalKafkaStreamsFactory = new InternalKafkaStreamsFactory(this.kafkaStreamsFactory);
        KafkaStreamsContainer kafkaStreamsContainer = new KafkaStreamsContainer(this.topologyContainer, internalKafkaStreamsFactory);
        internalKafkaStreamsFactory.setContainer(kafkaStreamsContainer);
        return kafkaStreamsContainer;
    }
}
