package io.simplesource.saga.action.async;

import io.simplesource.saga.action.internal.ActionTopologyBuilder;
import io.simplesource.saga.action.internal.AsyncPipe;
import io.simplesource.saga.action.internal.AsyncStream;
import io.simplesource.saga.model.serdes.ActionSerdes;
import io.simplesource.saga.model.specs.ActionProcessorSpec;
import io.simplesource.saga.shared.topics.TopicConfig;
import io.simplesource.saga.shared.topics.TopicConfigBuilder;
import io.simplesource.saga.shared.topics.TopicCreation;
import io.simplesource.saga.shared.topics.TopicTypes;
import io.simplesource.saga.shared.utils.StreamAppConfig;
import io.simplesource.saga.shared.utils.StreamAppUtils;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.streams.Topology;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/simplesource/saga/action/async/AsyncApp.class */
public final class AsyncApp<A> {
    private final List<TopicCreation> expectedTopics;
    private final TopicConfig actionTopicConfig;
    private final ActionProcessorSpec<A> actionSpec;
    private ScheduledExecutorService executor;
    private ActionTopologyBuilder<A> topologyBuilder;
    private final Logger logger = LoggerFactory.getLogger(AsyncApp.class);
    private final List<Supplier<Integer>> closeHandlers = new ArrayList();
    private Set<ExecutorService> executorServices = new HashSet();

    public AsyncApp(ActionSerdes<A> actionSerdes, TopicConfigBuilder.BuildSteps buildSteps) {
        ArrayList arrayList = new ArrayList(TopicTypes.ActionTopic.all);
        arrayList.add(TopicTypes.ActionTopic.requestUnprocessed);
        this.actionTopicConfig = TopicConfigBuilder.buildTopics(arrayList, new HashMap(), new HashMap(), buildSteps);
        this.actionSpec = new ActionProcessorSpec<>(actionSerdes);
        this.topologyBuilder = new ActionTopologyBuilder<>(this.actionSpec, this.actionTopicConfig);
        this.expectedTopics = TopicCreation.allTopics(this.actionTopicConfig);
    }

    public <D, K, O, R> AsyncApp<A> addAsync(AsyncSpec<A, D, K, O, R> asyncSpec) {
        asyncSpec.outputSpec.ifPresent(asyncOutput -> {
            this.expectedTopics.addAll(asyncOutput.topicCreations);
        });
        this.topologyBuilder.onBuildTopology(actionTopologyContext -> {
            ScheduledExecutorService newScheduledThreadPool = this.executor != null ? this.executor : Executors.newScheduledThreadPool(1);
            this.executorServices.add(newScheduledThreadPool);
            AsyncPipe addSubTopology = AsyncStream.addSubTopology(actionTopologyContext, new AsyncContext(this.actionSpec, this.actionTopicConfig.namer, asyncSpec, newScheduledThreadPool));
            addCloseHandler(() -> {
                addSubTopology.close();
                return 0;
            });
        });
        return this;
    }

    public AsyncApp<A> addExecutor(ScheduledExecutorService scheduledExecutorService) {
        this.executor = scheduledExecutorService;
        return this;
    }

    public AsyncApp<A> addCloseHandler(Supplier<Integer> supplier) {
        this.closeHandlers.add(supplier);
        return this;
    }

    public void run(StreamAppConfig streamAppConfig) {
        Properties config = StreamAppConfig.getConfig(streamAppConfig);
        try {
            StreamAppUtils.createMissingTopics(AdminClient.create(config), this.expectedTopics).all().get(30L, TimeUnit.SECONDS);
            Topology buildTopology = buildTopology(streamAppConfig);
            this.logger.info("Topology description {}", buildTopology.describe());
            StreamAppUtils.runStreamApp(config, buildTopology);
            StreamAppUtils.addShutdownHook(() -> {
                this.logger.info("Shutting down async app resources");
                this.closeHandlers.forEach((v0) -> {
                    v0.get();
                });
                this.executorServices.forEach(StreamAppUtils::shutdownExecutorService);
            });
        } catch (Exception e) {
            throw new RuntimeException("Unable to create missing topics");
        }
    }

    public Topology buildTopology(StreamAppConfig streamAppConfig) {
        return this.topologyBuilder.build(streamAppConfig);
    }
}
