package io.simplesource.saga.action.internal;

import io.simplesource.saga.action.async.AsyncContext;
import io.simplesource.saga.action.internal.IdempotentStream;
import io.simplesource.saga.model.messages.ActionRequest;
import io.simplesource.saga.model.messages.ActionResponse;
import io.simplesource.saga.model.saga.SagaId;
import org.apache.kafka.streams.kstream.KStream;

/* loaded from: input_file:io/simplesource/saga/action/internal/AsyncStream.class */
public final class AsyncStream {
    public static <A, D, K, O, R> AsyncPipe addSubTopology(ActionTopologyContext<A> actionTopologyContext, AsyncContext<A, D, K, O, R> asyncContext) {
        addSubTopology(asyncContext, actionTopologyContext.actionRequests, actionTopologyContext.actionResponses);
        return AsyncTransform.async(asyncContext, actionTopologyContext.properties);
    }

    private static <A, D, K, O, R> void addSubTopology(AsyncContext<A, D, K, O, R> asyncContext, KStream<SagaId, ActionRequest<A>> kStream, KStream<SagaId, ActionResponse<A>> kStream2) {
        IdempotentStream.IdempotentAction actionRequestsWithResponse = IdempotentStream.getActionRequestsWithResponse(asyncContext.actionSpec, kStream, kStream2);
        ActionPublisher.publishActionResponse(asyncContext.getActionContext(), actionRequestsWithResponse.priorResponses);
        ActionPublisher.publishActionRequest(asyncContext.getActionContext(), actionRequestsWithResponse.unprocessedRequests, true);
    }
}
