package io.simplesource.saga.action.internal;

import io.simplesource.saga.action.async.AsyncContext;
import io.simplesource.saga.action.async.AsyncSpec;
import java.util.Objects;
import java.util.Properties;
import java.util.function.Function;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;

/* loaded from: input_file:io/simplesource/saga/action/internal/AsyncTransform.class */
final class AsyncTransform {
    static final boolean useTransactions = false;

    AsyncTransform() {
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static <K, V> ProducerRecord<byte[], byte[]> toBytes(ProducerRecord<K, V> producerRecord, Serde<K> serde, Serde<V> serde2) {
        String str = producerRecord.topic();
        return new ProducerRecord<>(str, serde.serializer().serialize(str, producerRecord.key()), serde2.serializer().serialize(str, producerRecord.value()));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static <A, D, K, O, R> AsyncPipe async(AsyncContext<A, D, K, O, R> asyncContext, Properties properties) {
        AsyncSpec<A, D, K, O, R> asyncSpec = asyncContext.asyncSpec;
        Function function = properties2 -> {
            Properties properties2 = new Properties();
            properties2.forEach((obj, obj2) -> {
                properties2.setProperty(obj.toString(), obj2.toString());
            });
            return properties2;
        };
        Properties properties3 = (Properties) function.apply(properties);
        properties3.setProperty("group.id", asyncSpec.groupId + "_async_consumer_" + asyncSpec.actionType);
        properties3.setProperty("enable.auto.commit", "true");
        properties3.setProperty("auto.commit.interval.ms", "1000");
        properties3.setProperty("isolation.level", "read_committed");
        Properties properties4 = (Properties) function.apply(properties);
        properties4.setProperty("enable.idempotence", "true");
        KafkaProducer kafkaProducer = new KafkaProducer(properties4, Serdes.ByteArray().serializer(), Serdes.ByteArray().serializer());
        AsyncConsumerRunner asyncConsumerRunner = new AsyncConsumerRunner(asyncContext, properties3, new KafkaAsyncPublisher(kafkaProducer, asyncContext.actionSpec.serdes.sagaId(), asyncContext.actionSpec.serdes.response()), asyncSerdes -> {
            return new KafkaAsyncPublisher(kafkaProducer, asyncSerdes.key, asyncSerdes.output);
        }, bool -> {
            kafkaProducer.flush();
            kafkaProducer.close();
        });
        new Thread(asyncConsumerRunner).start();
        Objects.requireNonNull(asyncConsumerRunner);
        return asyncConsumerRunner::close;
    }
}
