package io.simplesource.saga.action.sourcing;

import io.simplesource.saga.action.internal.ActionTopologyBuilder;
import io.simplesource.saga.action.internal.SourcingStream;
import io.simplesource.saga.model.serdes.ActionSerdes;
import io.simplesource.saga.model.specs.ActionProcessorSpec;
import io.simplesource.saga.shared.topics.TopicConfig;
import io.simplesource.saga.shared.topics.TopicConfigBuilder;
import io.simplesource.saga.shared.topics.TopicCreation;
import io.simplesource.saga.shared.topics.TopicTypes;
import io.simplesource.saga.shared.utils.StreamAppConfig;
import io.simplesource.saga.shared.utils.StreamAppUtils;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.streams.Topology;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/simplesource/saga/action/sourcing/SourcingApp.class */
public final class SourcingApp<A> {
    private final Logger logger = LoggerFactory.getLogger(SourcingApp.class);
    private final TopicConfig actionTopicConfig;
    private final ActionProcessorSpec<A> actionSpec;
    private final List<TopicCreation> topicCreations;
    private final ActionTopologyBuilder<A> topologyBuilder;

    public SourcingApp(ActionSerdes<A> actionSerdes, TopicConfigBuilder.BuildSteps buildSteps) {
        this.actionTopicConfig = TopicConfigBuilder.buildTopics(TopicTypes.ActionTopic.all, Collections.emptyMap(), Collections.emptyMap(), buildSteps);
        this.actionSpec = new ActionProcessorSpec<>(actionSerdes);
        this.topicCreations = TopicCreation.allTopics(this.actionTopicConfig);
        this.topologyBuilder = new ActionTopologyBuilder<>(this.actionSpec, this.actionTopicConfig);
    }

    public <D, K, C> SourcingApp<A> addCommand(CommandSpec<A, D, K, C> commandSpec, TopicConfigBuilder.BuildSteps buildSteps) {
        TopicConfig buildTopics = TopicConfigBuilder.buildTopics(TopicTypes.CommandTopic.all, Collections.emptyMap(), Collections.emptyMap(), buildSteps);
        this.topicCreations.addAll(TopicCreation.allTopics(buildTopics));
        this.topologyBuilder.onBuildTopology(actionTopologyContext -> {
            SourcingStream.addSubTopology(actionTopologyContext, new SourcingContext(this.actionSpec, commandSpec, this.actionTopicConfig.namer, buildTopics.namer));
        });
        return this;
    }

    public void run(StreamAppConfig streamAppConfig) {
        Properties config = StreamAppConfig.getConfig(streamAppConfig);
        this.logger.info("Expected topics:");
        Stream<R> map = this.topicCreations.stream().map(topicCreation -> {
            return topicCreation.topicName;
        });
        Logger logger = this.logger;
        Objects.requireNonNull(logger);
        map.forEach(logger::info);
        try {
            StreamAppUtils.createMissingTopics(AdminClient.create(config), this.topicCreations).all().get(30L, TimeUnit.SECONDS);
            Topology buildTopology = buildTopology(streamAppConfig);
            this.logger.info("Topology description {}", buildTopology.describe());
            StreamAppUtils.runStreamApp(config, buildTopology);
        } catch (Exception e) {
            throw new RuntimeException("Unable to create all the topics");
        }
    }

    Topology buildTopology(StreamAppConfig streamAppConfig) {
        return this.topologyBuilder.build(streamAppConfig);
    }
}
