package io.simplesource.kafka.internal.streams;

import io.simplesource.api.Aggregator;
import io.simplesource.api.CommandAPI;
import io.simplesource.api.InitialValue;
import io.simplesource.data.Result;
import io.simplesource.data.Sequence;
import io.simplesource.kafka.api.AggregateResources;
import io.simplesource.kafka.api.AggregateSerdes;
import io.simplesource.kafka.model.AggregateUpdate;
import io.simplesource.kafka.model.AggregateUpdateResult;
import io.simplesource.kafka.model.CommandEvents;
import io.simplesource.kafka.model.CommandRequest;
import io.simplesource.kafka.model.ValueWithSequence;
import io.simplesource.kafka.spec.AggregateSpec;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.Serialized;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.ValueTransformerWithKey;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.state.WindowStore;
import org.apache.kafka.streams.state.internals.KeyValueStoreBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/simplesource/kafka/internal/streams/EventSourcedTopology.class */
public final class EventSourcedTopology<K, C, E, A> {
    private static final Logger logger = LoggerFactory.getLogger(EventSourcedTopology.class);
    private final AggregateSpec<K, C, E, A> aggregateSpec;
    private final long commandResponseRetentionInSeconds;
    private final AggregateSerdes<K, C, E, A> serdes;
    private final Aggregator<E, A> aggregator;
    private final InitialValue<K, A> initialValue;
    private final Consumed<K, CommandRequest<C>> commandEventsConsumed;
    private final Produced<K, ValueWithSequence<E>> eventsConsumedProduced;
    private final Produced<K, AggregateUpdate<A>> aggregatedUpdateProduced;
    private final Serialized<UUID, AggregateUpdateResult<A>> serializedAggregateUpdate;

    /* loaded from: input_file:io/simplesource/kafka/internal/streams/EventSourcedTopology$CommandRequestTransformer.class */
    private final class CommandRequestTransformer implements ValueTransformerWithKey<K, CommandRequest<C>, CommandEvents<E, A>> {
        private ReadOnlyKeyValueStore<K, AggregateUpdate<A>> stateStore;

        private CommandRequestTransformer() {
        }

        public void init(ProcessorContext processorContext) {
            this.stateStore = processorContext.getStateStore(EventSourcedTopology.this.storeName(AggregateResources.StateStoreEntity.aggregate_update));
        }

        public CommandEvents<E, A> transform(K k, CommandRequest<C> commandRequest) {
            AggregateUpdate of;
            Result failure;
            try {
                of = (AggregateUpdate) Optional.ofNullable(this.stateStore.get(k)).orElse(AggregateUpdate.of(EventSourcedTopology.this.initialValue.empty(k)));
            } catch (Exception e) {
                of = AggregateUpdate.of(EventSourcedTopology.this.initialValue.empty(k));
            }
            AggregateUpdate aggregateUpdate = of;
            try {
                failure = EventSourcedTopology.this.aggregateSpec.generation().commandHandler().interpretCommand(k, commandRequest.readSequence(), aggregateUpdate.sequence(), aggregateUpdate.aggregate(), commandRequest.command());
            } catch (Exception e2) {
                EventSourcedTopology.logger.warn("[{} aggregate] Failed to apply command handler on key {} to request {}", new Object[]{EventSourcedTopology.this.aggregateSpec.aggregateName(), k, commandRequest, e2});
                failure = Result.failure(CommandAPI.CommandError.CommandHandlerFailed, e2);
            }
            return new CommandEvents<>(commandRequest.commandId(), commandRequest.readSequence(), aggregateUpdate.aggregate(), failure.map(nonEmptyList -> {
                Sequence[] sequenceArr = {aggregateUpdate.sequence()};
                return nonEmptyList.map(obj -> {
                    sequenceArr[0] = sequenceArr[0].next();
                    return new ValueWithSequence(obj, sequenceArr[0]);
                });
            }));
        }

        public void close() {
        }

        public /* bridge */ /* synthetic */ Object transform(Object obj, Object obj2) {
            return transform((CommandRequestTransformer) obj, (CommandRequest) obj2);
        }
    }

    public EventSourcedTopology(AggregateSpec<K, C, E, A> aggregateSpec) {
        this.aggregateSpec = aggregateSpec;
        this.commandResponseRetentionInSeconds = aggregateSpec.generation().stateStoreSpec().retentionInSeconds();
        this.serdes = aggregateSpec.serialization().serdes();
        this.aggregator = aggregateSpec.generation().aggregator();
        this.initialValue = aggregateSpec.generation().initialValue();
        this.commandEventsConsumed = Consumed.with(this.serdes.aggregateKey(), this.serdes.commandRequest());
        this.eventsConsumedProduced = Produced.with(this.serdes.aggregateKey(), this.serdes.valueWithSequence());
        this.aggregatedUpdateProduced = Produced.with(this.serdes.aggregateKey(), this.serdes.aggregateUpdate());
        this.serializedAggregateUpdate = Serialized.with(this.serdes.commandResponseKey(), this.serdes.updateResult());
    }

    public void addTopology(StreamsBuilder streamsBuilder) {
        addStateStores(streamsBuilder);
        KStream<K, CommandEvents<E, A>> eventResultStream = eventResultStream(streamsBuilder);
        publishEvents(eventResultStream);
        KStream<K, AggregateUpdateResult<A>> aggregateUpdateStream = aggregateUpdateStream(eventResultStream);
        publishAggregateUpdates(aggregateUpdateStream);
        updateAggregateStateStore(aggregateUpdateStream);
        updateCommandResultStore(aggregateUpdateStream);
    }

    private void addStateStores(StreamsBuilder streamsBuilder) {
        streamsBuilder.addStateStore(new KeyValueStoreBuilder(Stores.persistentKeyValueStore(storeName(AggregateResources.StateStoreEntity.aggregate_update)), this.serdes.aggregateKey(), this.serdes.aggregateUpdate(), Time.SYSTEM));
    }

    private KStream<K, CommandEvents<E, A>> eventResultStream(StreamsBuilder streamsBuilder) {
        return streamsBuilder.stream(topicName(AggregateResources.TopicEntity.command_request), this.commandEventsConsumed).transformValues(() -> {
            return new CommandRequestTransformer();
        }, new String[]{storeName(AggregateResources.StateStoreEntity.aggregate_update)});
    }

    private void publishEvents(KStream<K, CommandEvents<E, A>> kStream) {
        kStream.flatMapValues(commandEvents -> {
            return (List) commandEvents.eventValue().fold(nonEmptyList -> {
                return Collections.emptyList();
            }, nonEmptyList2 -> {
                return (List) nonEmptyList2.stream().collect(Collectors.toList());
            });
        }).peek((obj, valueWithSequence) -> {
            logger.debug("Writing event ({},{}) to {}", new Object[]{obj, valueWithSequence, topicName(AggregateResources.TopicEntity.event)});
        }).to(topicName(AggregateResources.TopicEntity.event), this.eventsConsumedProduced);
    }

    private KStream<K, AggregateUpdateResult<A>> aggregateUpdateStream(KStream<K, CommandEvents<E, A>> kStream) {
        return kStream.mapValues((obj, commandEvents) -> {
            return new AggregateUpdateResult(commandEvents.commandId(), commandEvents.readSequence(), commandEvents.eventValue().map(nonEmptyList -> {
                return (AggregateUpdate) nonEmptyList.fold(valueWithSequence -> {
                    return new AggregateUpdate(this.aggregator.applyEvent(commandEvents.aggregate(), valueWithSequence.value()), valueWithSequence.sequence());
                }, (aggregateUpdate, valueWithSequence2) -> {
                    return new AggregateUpdate(this.aggregator.applyEvent(aggregateUpdate.aggregate(), valueWithSequence2.value()), valueWithSequence2.sequence());
                });
            }));
        });
    }

    private void publishAggregateUpdates(KStream<K, AggregateUpdateResult<A>> kStream) {
        kStream.flatMapValues(aggregateUpdateResult -> {
            return (List) aggregateUpdateResult.updatedAggregateResult().fold(nonEmptyList -> {
                return Collections.emptyList();
            }, aggregateUpdate -> {
                return Collections.singletonList(aggregateUpdate);
            });
        }).to(topicName(AggregateResources.TopicEntity.aggregate), this.aggregatedUpdateProduced);
    }

    private void updateAggregateStateStore(KStream<K, AggregateUpdateResult<A>> kStream) {
        kStream.process(() -> {
            return new Processor<K, AggregateUpdateResult<A>>() { // from class: io.simplesource.kafka.internal.streams.EventSourcedTopology.1
                private KeyValueStore stateStore;

                public void init(ProcessorContext processorContext) {
                    this.stateStore = processorContext.getStateStore(EventSourcedTopology.this.storeName(AggregateResources.StateStoreEntity.aggregate_update));
                }

                public void process(K k, AggregateUpdateResult<A> aggregateUpdateResult) {
                    aggregateUpdateResult.updatedAggregateResult().ifSuccessful(aggregateUpdate -> {
                        this.stateStore.put(k, aggregateUpdate);
                    });
                }

                public void close() {
                }

                public /* bridge */ /* synthetic */ void process(Object obj, Object obj2) {
                    process((AnonymousClass1) obj, (AggregateUpdateResult) obj2);
                }
            };
        }, new String[]{storeName(AggregateResources.StateStoreEntity.aggregate_update)});
    }

    private void updateCommandResultStore(KStream<K, AggregateUpdateResult<A>> kStream) {
        long millis = TimeUnit.SECONDS.toMillis(this.commandResponseRetentionInSeconds);
        kStream.map((obj, aggregateUpdateResult) -> {
            return KeyValue.pair(aggregateUpdateResult.commandId(), aggregateUpdateResult);
        }).groupByKey(this.serializedAggregateUpdate).windowedBy(TimeWindows.of(millis).advanceBy(millis / 3)).reduce((aggregateUpdateResult2, aggregateUpdateResult3) -> {
            return aggregateUpdateResult3;
        }, materializedWindow(storeName(AggregateResources.StateStoreEntity.command_response)));
    }

    private Materialized<UUID, AggregateUpdateResult<A>, WindowStore<Bytes, byte[]>> materializedWindow(String str) {
        return Materialized.as(str).withKeySerde(this.serdes.commandResponseKey()).withValueSerde(this.serdes.updateResult());
    }

    private String topicName(AggregateResources.TopicEntity topicEntity) {
        return this.aggregateSpec.serialization().resourceNamingStrategy().topicName(this.aggregateSpec.aggregateName(), topicEntity.name());
    }

    /* JADX INFO: Access modifiers changed from: private */
    public String storeName(AggregateResources.StateStoreEntity stateStoreEntity) {
        return this.aggregateSpec.serialization().resourceNamingStrategy().storeName(this.aggregateSpec.aggregateName(), stateStoreEntity.name());
    }
}
