package io.simplesource.saga.action.async;

import io.simplesource.saga.action.app.ActionProcessor;
import io.simplesource.saga.action.internal.ActionTopologyContext;
import io.simplesource.saga.action.internal.AsyncPipe;
import io.simplesource.saga.action.internal.AsyncStream;
import io.simplesource.saga.model.specs.ActionSpec;
import io.simplesource.saga.shared.streams.StreamBuildSpec;
import io.simplesource.saga.shared.topics.TopicConfig;
import io.simplesource.saga.shared.topics.TopicConfigBuilder;
import io.simplesource.saga.shared.topics.TopicTypes;
import io.simplesource.saga.shared.topics.TopicUtils;
import java.util.ArrayList;
import java.util.Optional;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;

/* loaded from: input_file:io/simplesource/saga/action/async/AsyncBuilder.class */
public final class AsyncBuilder {
    public static <A, D, K, O, R> ActionProcessor<A> apply(AsyncSpec<A, D, K, O, R> asyncSpec, TopicConfigBuilder.BuildSteps buildSteps, ScheduledExecutorService scheduledExecutorService) {
        return actionAppContext -> {
            ActionSpec<A> actionSpec = actionAppContext.actionSpec;
            ArrayList arrayList = new ArrayList(TopicTypes.ActionTopic.all);
            asyncSpec.resultSpec.ifPresent(asyncResult -> {
                asyncResult.outputSerdes.ifPresent(topicSerdes -> {
                    arrayList.add("action_output");
                });
            });
            arrayList.add("action_request_unprocessed");
            TopicConfig build = TopicConfigBuilder.build(arrayList, buildSteps.withInitialStep(topicConfigBuilder -> {
                return topicConfigBuilder.withTopicBaseName(TopicUtils.actionTopicBaseName(asyncSpec.actionType));
            }));
            return new StreamBuildSpec(build.allTopics(), streamsBuilder -> {
                ActionTopologyContext of = ActionTopologyContext.of(actionSpec, build.namer, actionAppContext.properties, streamsBuilder);
                ScheduledExecutorService newScheduledThreadPool = scheduledExecutorService != null ? scheduledExecutorService : Executors.newScheduledThreadPool(1);
                AsyncPipe addSubTopology = AsyncStream.addSubTopology(of, new AsyncContext(actionSpec, build.namer, asyncSpec, newScheduledThreadPool));
                return Optional.of(() -> {
                    if (scheduledExecutorService == null) {
                        newScheduledThreadPool.shutdown();
                    }
                    addSubTopology.close();
                });
            });
        };
    }

    public static <A, D, K, O, R> ActionProcessor<A> apply(AsyncSpec<A, D, K, O, R> asyncSpec, TopicConfigBuilder.BuildSteps buildSteps) {
        return apply(asyncSpec, buildSteps, null);
    }

    public static <A, D, K, O, R> ActionProcessor<A> apply(AsyncSpec<A, D, K, O, R> asyncSpec) {
        return apply(asyncSpec, topicConfigBuilder -> {
            return topicConfigBuilder;
        }, null);
    }

    public static <A, D, K, O, R> ActionProcessor<A> apply(AsyncSpec<A, D, K, O, R> asyncSpec, ScheduledExecutorService scheduledExecutorService) {
        return apply(asyncSpec, topicConfigBuilder -> {
            return topicConfigBuilder;
        }, scheduledExecutorService);
    }
}
