package io.simplesource.saga.action.internal;

import io.simplesource.api.CommandError;
import io.simplesource.api.CommandId;
import io.simplesource.data.NonEmptyList;
import io.simplesource.data.Result;
import io.simplesource.data.Sequence;
import io.simplesource.kafka.api.CommandSerdes;
import io.simplesource.kafka.internal.util.Tuple2;
import io.simplesource.kafka.model.CommandRequest;
import io.simplesource.kafka.model.CommandResponse;
import io.simplesource.saga.action.eventsourcing.EventSourcingContext;
import io.simplesource.saga.action.internal.IdempotentStream;
import io.simplesource.saga.model.messages.ActionRequest;
import io.simplesource.saga.model.messages.ActionResponse;
import io.simplesource.saga.model.saga.SagaError;
import io.simplesource.saga.model.saga.SagaId;
import io.simplesource.saga.model.serdes.ActionSerdes;
import io.simplesource.saga.shared.serialization.TupleSerdes;
import io.simplesource.saga.shared.streams.StreamUtils;
import java.time.Duration;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.Grouped;
import org.apache.kafka.streams.kstream.JoinWindows;
import org.apache.kafka.streams.kstream.Joined;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Predicate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/simplesource/saga/action/internal/EventSourcingStream.class */
public final class EventSourcingStream {
    private static Logger logger;
    static final /* synthetic */ boolean $assertionsDisabled;

    public static <A, D, K, C> void addSubTopology(ActionTopologyContext<A> actionTopologyContext, EventSourcingContext<A, D, K, C> eventSourcingContext) {
        addSubTopology(eventSourcingContext, actionTopologyContext.actionRequests, actionTopologyContext.actionResponses, EventSourcingConsumer.commandResponseStream(eventSourcingContext.eventSourcingSpec, eventSourcingContext.commandTopicNamer, actionTopologyContext.builder));
    }

    private static <A, D, K, C> void addSubTopology(EventSourcingContext<A, D, K, C> eventSourcingContext, KStream<SagaId, ActionRequest<A>> kStream, KStream<SagaId, ActionResponse> kStream2, KStream<K, CommandResponse<K>> kStream3) {
        KStream selectKey = kStream3.selectKey((obj, commandResponse) -> {
            return commandResponse.commandId();
        });
        IdempotentStream.IdempotentAction actionRequestsWithResponse = IdempotentStream.getActionRequestsWithResponse(eventSourcingContext.actionSpec, kStream, kStream2);
        Tuple2 handleActionRequest = handleActionRequest(eventSourcingContext, actionRequestsWithResponse.unprocessedRequests, kStream3);
        KStream kStream4 = (KStream) handleActionRequest.v1();
        KStream kStream5 = (KStream) handleActionRequest.v2();
        KStream<SagaId, ActionResponse> handleCommandResponse = handleCommandResponse(eventSourcingContext, kStream, selectKey);
        ActionContext<A> actionContext = eventSourcingContext.getActionContext();
        EventSourcingPublisher.publishCommandRequest(eventSourcingContext, kStream5);
        ActionPublisher.publishActionResponse(actionContext, actionRequestsWithResponse.priorResponses);
        ActionPublisher.publishActionResponse(actionContext, handleCommandResponse);
        ActionPublisher.publishActionResponse(actionContext, kStream4);
    }

    private static <A, D> D getDecoded(EventSourcingContext<A, D, ?, ?> eventSourcingContext, ActionRequest<A> actionRequest) {
        D d = (D) ((Result) eventSourcingContext.eventSourcingSpec.decode.apply(actionRequest.actionCommand.command)).getOrElse((Object) null);
        if ($assertionsDisabled || d != null) {
            return d;
        }
        throw new AssertionError();
    }

    private static <A, D, K, C> Tuple2<KStream<SagaId, ActionResponse>, KStream<K, CommandRequest<K, C>>> handleActionRequest(EventSourcingContext<A, D, K, C> eventSourcingContext, KStream<SagaId, ActionRequest<A>> kStream, KStream<K, CommandResponse<K>> kStream2) {
        KStream peek = kStream.mapValues((sagaId, actionRequest) -> {
            return Tuple2.of(actionRequest, (Result) eventSourcingContext.eventSourcingSpec.decode.apply(actionRequest.actionCommand.command));
        }).peek(StreamUtils.logValues(logger, "reqsWithDecoded"));
        return Tuple2.of(peek.branch(new Predicate[]{(sagaId2, tuple2) -> {
            return ((Result) tuple2.v2()).isSuccess();
        }, (sagaId3, tuple22) -> {
            return ((Result) tuple22.v2()).isFailure();
        }})[1].mapValues((sagaId4, tuple23) -> {
            ActionRequest actionRequest2 = (ActionRequest) tuple23.v1();
            return new ActionResponse(actionRequest2.sagaId, actionRequest2.actionId, actionRequest2.actionCommand.commandId, Result.failure(SagaError.of(SagaError.Reason.InternalError, (Throwable) ((NonEmptyList) ((Result) tuple23.v2()).failureReasons().get()).head()), new SagaError[0]));
        }), peek.mapValues((sagaId5, tuple24) -> {
            return Tuple2.of((ActionRequest) tuple24.v1(), ((Result) tuple24.v2()).getOrElse((Object) null));
        }).map((sagaId6, tuple25) -> {
            return KeyValue.pair(Tuple2.of(eventSourcingContext.eventSourcingSpec.keyMapper.apply(getDecoded(eventSourcingContext, (ActionRequest) tuple25.v1())), ((ActionRequest) tuple25.v1()).sagaId), (ActionRequest) tuple25.v1());
        }).leftJoin(latestSequenceNumbersForSagaAggregate(eventSourcingContext, kStream, kStream2), (actionRequest2, l) -> {
            Object decoded = getDecoded(eventSourcingContext, actionRequest2);
            return new CommandRequest(actionRequest2.actionCommand.commandId, eventSourcingContext.eventSourcingSpec.keyMapper.apply(decoded), l == null ? eventSourcingContext.eventSourcingSpec.sequenceMapper.apply(decoded) : Sequence.position(l.longValue()), eventSourcingContext.eventSourcingSpec.commandMapper.apply(decoded));
        }, Joined.with(TupleSerdes.tuple2(eventSourcingContext.cSerdes().aggregateKey(), eventSourcingContext.aSerdes().sagaId()), eventSourcingContext.aSerdes().request(), Serdes.Long())).selectKey((tuple26, commandRequest) -> {
            return commandRequest.aggregateKey();
        }).peek(StreamUtils.logValues(logger, "commandRequestByAggregate")));
    }

    private static <A, K> KTable<Tuple2<K, SagaId>, Long> latestSequenceNumbersForSagaAggregate(EventSourcingContext<A, ?, K, ?> eventSourcingContext, KStream<SagaId, ActionRequest<A>> kStream, KStream<K, CommandResponse<K>> kStream2) {
        CommandSerdes<K, ?> cSerdes = eventSourcingContext.cSerdes();
        ActionSerdes<A> aSerdes = eventSourcingContext.aSerdes();
        Serde tuple2 = TupleSerdes.tuple2(cSerdes.aggregateKey(), aSerdes.sagaId());
        return kStream2.selectKey((obj, commandResponse) -> {
            return commandResponse.commandId();
        }).join(kStream.selectKey((sagaId, actionRequest) -> {
            return actionRequest.actionCommand.commandId;
        }), (v0, v1) -> {
            return Tuple2.of(v0, v1);
        }, JoinWindows.of(eventSourcingContext.actionSpec().sagaDuration).until((eventSourcingContext.actionSpec().sagaDuration.toMillis() * 2) + 1), Joined.with(cSerdes.commandId(), cSerdes.commandResponse(), aSerdes.request())).selectKey((commandId, tuple22) -> {
            return Tuple2.of(((CommandResponse) tuple22.v1()).aggregateKey(), ((ActionRequest) tuple22.v2()).sagaId);
        }).mapValues(tuple23 -> {
            return Long.valueOf(((Sequence) ((CommandResponse) tuple23.v1()).sequenceResult().getOrElse(Sequence.first())).getSeq());
        }).groupByKey(Grouped.with(tuple2, Serdes.Long())).reduce((l, l2) -> {
            return l2.longValue() > l.longValue() ? l2 : l;
        }, Materialized.with(tuple2, Serdes.Long()));
    }

    private static <A, D, K, C> KStream<SagaId, ActionResponse> handleCommandResponse(EventSourcingContext<A, D, K, C> eventSourcingContext, KStream<SagaId, ActionRequest<A>> kStream, KStream<CommandId, CommandResponse<K>> kStream2) {
        Duration duration = eventSourcingContext.eventSourcingSpec.timeout;
        return kStream.selectKey((sagaId, actionRequest) -> {
            return actionRequest.actionCommand.commandId;
        }).join(kStream2, (v0, v1) -> {
            return Tuple2.of(v0, v1);
        }, JoinWindows.of(duration).until((duration.toMillis() * 2) + 1), Joined.with(eventSourcingContext.cSerdes().commandId(), eventSourcingContext.aSerdes().request(), eventSourcingContext.cSerdes().commandResponse())).peek(StreamUtils.logValues(logger, "joinActionRequestAndCommandResponse")).mapValues((commandId, tuple2) -> {
            ActionRequest actionRequest2 = (ActionRequest) tuple2.v1();
            CommandResponse commandResponse = (CommandResponse) tuple2.v2();
            return new ActionResponse(actionRequest2.sagaId, actionRequest2.actionId, actionRequest2.actionCommand.commandId, (Result) (commandResponse == null ? Result.failure(CommandError.of(CommandError.Reason.Timeout, "Timed out waiting for response from Command Processor"), new CommandError[0]) : commandResponse.sequenceResult()).fold(nonEmptyList -> {
                return Result.failure(SagaError.of(SagaError.Reason.CommandError, String.join(",", (Iterable<? extends CharSequence>) nonEmptyList.map((v0) -> {
                    return v0.getMessage();
                }))), new SagaError[0]);
            }, sequence -> {
                return Result.success(true);
            }));
        }).selectKey((commandId2, actionResponse) -> {
            return actionResponse.sagaId;
        }).peek(StreamUtils.logValues(logger, "resultStream"));
    }

    static {
        $assertionsDisabled = !EventSourcingStream.class.desiredAssertionStatus();
        logger = LoggerFactory.getLogger(EventSourcingStream.class);
    }
}
