package io.simplesource.saga.client.api;

import io.simplesource.data.FutureResult;
import io.simplesource.data.Result;
import io.simplesource.kafka.dsl.KafkaConfig;
import io.simplesource.kafka.internal.client.KafkaRequestAPI;
import io.simplesource.kafka.internal.client.RequestAPIContext;
import io.simplesource.kafka.spec.TopicSpec;
import io.simplesource.kafka.spec.WindowSpec;
import io.simplesource.saga.model.api.SagaAPI;
import io.simplesource.saga.model.messages.SagaRequest;
import io.simplesource.saga.model.messages.SagaResponse;
import io.simplesource.saga.model.saga.SagaError;
import io.simplesource.saga.model.saga.SagaId;
import io.simplesource.saga.model.serdes.SagaSerdes;
import io.simplesource.saga.model.specs.SagaSpec;
import io.simplesource.saga.shared.streams.StreamAppUtils;
import io.simplesource.saga.shared.topics.TopicConfig;
import java.time.Duration;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

/* loaded from: input_file:io/simplesource/saga/client/api/KafkaSagaAPI.class */
public final class KafkaSagaAPI<A> implements SagaAPI<A> {
    private final KafkaRequestAPI<SagaId, SagaRequest<A>, SagaId, SagaResponse> requestApi;

    public KafkaSagaAPI(SagaSpec<A> sagaSpec, KafkaConfig kafkaConfig, TopicConfig topicConfig, String str, ScheduledExecutorService scheduledExecutorService) {
        SagaSerdes sagaSerdes = sagaSpec.serdes;
        this.requestApi = new KafkaRequestAPI<>(RequestAPIContext.builder().kafkaConfig(kafkaConfig).requestTopic(topicConfig.namer.apply("saga_request")).responseTopicMapTopic(topicConfig.namer.apply("saga_response_topic_map")).privateResponseTopic(topicConfig.namer.apply("saga_response") + "_" + str).requestKeySerde(sagaSerdes.sagaId()).requestValueSerde(sagaSerdes.request()).responseKeySerde(sagaSerdes.sagaId()).responseValueSerde(sagaSerdes.response()).responseWindowSpec(new WindowSpec(TimeUnit.DAYS.toSeconds(7L))).outputTopicConfig((TopicSpec) topicConfig.topicSpecs.get("saga_response")).errorValue((sagaRequest, th) -> {
            return new SagaResponse(sagaRequest.sagaId, Result.failure(SagaError.of(SagaError.Reason.InternalError, th), new SagaError[0]));
        }).scheduler(scheduledExecutorService).uuidToResponseId(SagaId::of).responseIdToUuid((v0) -> {
            return v0.id();
        }).build());
        StreamAppUtils.addShutdownHook(() -> {
            StreamAppUtils.shutdownExecutorService(scheduledExecutorService);
            this.requestApi.close();
        });
    }

    public FutureResult<SagaError, SagaId> submitSaga(SagaRequest<A> sagaRequest) {
        return this.requestApi.publishRequest(sagaRequest.sagaId, sagaRequest.sagaId, sagaRequest).errorMap(exc -> {
            return SagaError.of(SagaError.Reason.InternalError, exc);
        }).map(publishResult -> {
            return sagaRequest.sagaId;
        });
    }

    public FutureResult<SagaError, SagaResponse> getSagaResponse(SagaId sagaId, Duration duration) {
        return FutureResult.ofFuture(this.requestApi.queryResponse(sagaId, duration), exc -> {
            return SagaError.of(SagaError.Reason.InternalError, exc);
        });
    }
}
