package io.simplesource.saga.action.internal;

import io.simplesource.saga.model.messages.ActionRequest;
import io.simplesource.saga.model.messages.ActionResponse;
import io.simplesource.saga.model.saga.SagaId;
import io.simplesource.saga.model.specs.ActionProcessorSpec;
import io.simplesource.saga.shared.topics.TopicNamer;
import io.simplesource.saga.shared.topics.TopicTypes;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/simplesource/saga/action/internal/ActionConsumer.class */
final class ActionConsumer {
    private static Logger logger = LoggerFactory.getLogger(ActionConsumer.class);

    ActionConsumer() {
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static <A> KStream<SagaId, ActionRequest<A>> actionRequestStream(ActionProcessorSpec<A> actionProcessorSpec, TopicNamer topicNamer, StreamsBuilder streamsBuilder) {
        return streamsBuilder.stream(topicNamer.apply(TopicTypes.ActionTopic.request), Consumed.with(actionProcessorSpec.serdes.sagaId(), actionProcessorSpec.serdes.request())).peek(Utils.logValues(logger, "actionRequestStream"));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static <A> KStream<SagaId, ActionResponse> actionResponseStream(ActionProcessorSpec<A> actionProcessorSpec, TopicNamer topicNamer, StreamsBuilder streamsBuilder) {
        return streamsBuilder.stream(topicNamer.apply(TopicTypes.ActionTopic.response), Consumed.with(actionProcessorSpec.serdes.sagaId(), actionProcessorSpec.serdes.response())).peek(Utils.logValues(logger, "actionResponseStream"));
    }
}
