package io.simplesource.saga.action.internal;

import io.simplesource.saga.action.async.AsyncContext;
import io.simplesource.saga.action.async.AsyncSpec;
import io.simplesource.saga.model.specs.ActionSpec;
import io.simplesource.saga.shared.kafka.KafkaPublisher;
import io.simplesource.saga.shared.properties.PropertiesBuilder;
import java.util.Objects;
import java.util.function.Function;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.common.serialization.Serdes;

/* loaded from: input_file:io/simplesource/saga/action/internal/AsyncProcessor.class */
final class AsyncProcessor {
    private static final boolean useTransactions = false;

    AsyncProcessor() {
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static <A, D, K, O, R> AsyncPipe apply(AsyncContext<A, D, K, O, R> asyncContext, PropertiesBuilder.BuildSteps buildSteps) {
        AsyncSpec<A, D, K, O, R> asyncSpec = asyncContext.asyncSpec;
        ActionSpec<A> actionSpec = asyncContext.actionSpec;
        PropertiesBuilder.BuildSteps withNextStep = buildSteps.withNextStep(propertiesBuilder -> {
            return propertiesBuilder.withProperty("group.id", asyncSpec.groupId + "_async_consumer_" + asyncSpec.actionType);
        });
        KafkaProducer kafkaProducer = new KafkaProducer(buildSteps.build(PropertiesBuilder.Target.Producer), Serdes.ByteArray().serializer(), Serdes.ByteArray().serializer());
        KafkaPublisher kafkaPublisher = new KafkaPublisher(kafkaProducer, asyncContext.actionSpec.serdes.sagaId(), asyncContext.actionSpec.serdes.response());
        AsyncPublisher asyncPublisher = (v1, v2, v3) -> {
            r0.send(v1, v2, v3);
        };
        Function function = topicSerdes -> {
            KafkaPublisher kafkaPublisher2 = new KafkaPublisher(kafkaProducer, topicSerdes.key, topicSerdes.value);
            return kafkaPublisher2::send;
        };
        ConsumerRunner consumerRunner = new ConsumerRunner(withNextStep.build(PropertiesBuilder.Target.Consumer), (sagaId, actionRequest) -> {
            AsyncInvoker.processActionRequest(asyncContext, sagaId, actionRequest, asyncPublisher, function);
        }, actionSpec.serdes.sagaId(), actionSpec.serdes.request(), asyncContext.actionTopicNamer.apply("action_request_unprocessed"), bool -> {
            kafkaProducer.flush();
            kafkaProducer.close();
        });
        new Thread(consumerRunner).start();
        Objects.requireNonNull(consumerRunner);
        return consumerRunner::close;
    }
}
