package io.fluxcapacitor.javaclient.persisting.eventsourcing;

import io.fluxcapacitor.common.MessageType;
import io.fluxcapacitor.common.api.Metadata;
import io.fluxcapacitor.common.api.SerializedMessage;
import io.fluxcapacitor.javaclient.common.Message;
import io.fluxcapacitor.javaclient.common.serialization.DeserializingMessage;
import io.fluxcapacitor.javaclient.common.serialization.DeserializingObject;
import io.fluxcapacitor.javaclient.modeling.Aggregate;
import io.fluxcapacitor.javaclient.modeling.AggregateRepository;
import io.fluxcapacitor.javaclient.persisting.caching.Cache;
import io.fluxcapacitor.javaclient.persisting.caching.NoOpCache;
import java.beans.ConstructorProperties;
import java.time.Instant;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/fluxcapacitor/javaclient/persisting/eventsourcing/EventSourcingRepository.class */
public class EventSourcingRepository implements AggregateRepository {
    private static final Logger log = LoggerFactory.getLogger(EventSourcingRepository.class);
    private static final Function<String, String> keyFunction = str -> {
        return EventSourcingRepository.class.getSimpleName() + ":" + str;
    };
    private final EventStore eventStore;
    private final SnapshotRepository snapshotRepository;
    private final Cache cache;
    private final EventStoreSerializer serializer;
    private final EventSourcingHandlerFactory handlerFactory;
    private final Map<Class<?>, Function<String, EventSourcedAggregate<?>>> aggregateFactory;
    private final ThreadLocal<Collection<EventSourcedAggregate<?>>> loadedModels;

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:io/fluxcapacitor/javaclient/persisting/eventsourcing/EventSourcingRepository$EventSourcedAggregate.class */
    public static class EventSourcedAggregate<T> implements Aggregate<T> {
        private final Class<T> aggregateType;
        private final EventSourcingHandler<T> eventSourcingHandler;
        private final Cache cache;
        private final EventStoreSerializer serializer;
        private final EventStore eventStore;
        private final SnapshotRepository snapshotRepository;
        private final SnapshotTrigger snapshotTrigger;
        private final String domain;
        private final List<DeserializingMessage> unpublishedEvents = new ArrayList();
        private final boolean readOnly;
        private final String id;
        private EventSourcedModel<T> model;

        protected void initialize() {
            this.model = (EventSourcedModel) Optional.ofNullable((EventSourcedModel) this.cache.getIfPresent((String) EventSourcingRepository.keyFunction.apply(this.id))).orElseGet(() -> {
                EventSourcedModel<T> orElse = this.snapshotRepository.getSnapshot(this.id).orElse(EventSourcedModel.builder().id(this.id).build());
                for (DeserializingMessage deserializingMessage : (List) this.eventStore.getDomainEvents(this.id, orElse.sequenceNumber()).collect(Collectors.toList())) {
                    orElse = orElse.toBuilder().sequenceNumber(orElse.sequenceNumber() + 1).lastEventId(deserializingMessage.getSerializedObject().getMessageId()).timestamp(Instant.ofEpochMilli(deserializingMessage.getSerializedObject().getTimestamp().longValue())).model(this.eventSourcingHandler.invoke(orElse.get(), deserializingMessage)).build();
                }
                return orElse;
            });
        }

        /* JADX WARN: Multi-variable type inference failed */
        @Override // io.fluxcapacitor.javaclient.modeling.Aggregate
        public Aggregate<T> apply(Message message) {
            if (this.readOnly) {
                throw new UnsupportedOperationException(String.format("Not allowed to apply a %s. The model is readonly.", message));
            }
            Metadata metadata = message.getMetadata();
            metadata.put(Aggregate.AGGREGATE_ID_METADATA_KEY, this.id);
            metadata.put(Aggregate.AGGREGATE_TYPE_METADATA_KEY, this.aggregateType.getName());
            SerializedMessage serialize = this.serializer.serialize(message);
            Objects.requireNonNull(message);
            DeserializingMessage deserializingMessage = new DeserializingMessage(new DeserializingObject(serialize, message::getPayload), MessageType.EVENT);
            this.model = this.model.toBuilder().sequenceNumber(this.model.sequenceNumber() + 1).lastEventId(message.getMessageId()).timestamp(message.getTimestamp()).model(this.eventSourcingHandler.invoke(this.model.get(), deserializingMessage)).build();
            this.unpublishedEvents.add(deserializingMessage);
            return this;
        }

        @Override // io.fluxcapacitor.javaclient.modeling.Aggregate
        public T get() {
            return this.model.get();
        }

        @Override // io.fluxcapacitor.javaclient.modeling.Aggregate
        public String lastEventId() {
            return this.model.lastEventId();
        }

        @Override // io.fluxcapacitor.javaclient.modeling.Aggregate
        public Instant timestamp() {
            return this.model.timestamp();
        }

        protected void commit() {
            if (this.unpublishedEvents.isEmpty()) {
                return;
            }
            try {
                this.cache.put((String) EventSourcingRepository.keyFunction.apply(this.model.id()), this.model);
                this.eventStore.storeDomainEvents(this.model.id(), this.domain, this.model.sequenceNumber(), new ArrayList(this.unpublishedEvents));
                if (this.snapshotTrigger.shouldCreateSnapshot(this.model, this.unpublishedEvents)) {
                    this.snapshotRepository.storeSnapshot(this.model);
                }
            } catch (Exception e) {
                EventSourcingRepository.log.error("Failed to commit new events of aggregate {}", this.model.id(), e);
                this.cache.invalidate((String) EventSourcingRepository.keyFunction.apply(this.model.id()));
            } finally {
                this.unpublishedEvents.clear();
            }
        }

        @ConstructorProperties({"aggregateType", "eventSourcingHandler", "cache", "serializer", "eventStore", "snapshotRepository", "snapshotTrigger", "domain", "readOnly", "id"})
        public EventSourcedAggregate(Class<T> cls, EventSourcingHandler<T> eventSourcingHandler, Cache cache, EventStoreSerializer eventStoreSerializer, EventStore eventStore, SnapshotRepository snapshotRepository, SnapshotTrigger snapshotTrigger, String str, boolean z, String str2) {
            this.aggregateType = cls;
            this.eventSourcingHandler = eventSourcingHandler;
            this.cache = cache;
            this.serializer = eventStoreSerializer;
            this.eventStore = eventStore;
            this.snapshotRepository = snapshotRepository;
            this.snapshotTrigger = snapshotTrigger;
            this.domain = str;
            this.readOnly = z;
            this.id = str2;
        }
    }

    public EventSourcingRepository(EventStore eventStore, SnapshotRepository snapshotRepository, Cache cache, EventStoreSerializer eventStoreSerializer) {
        this(eventStore, snapshotRepository, cache, eventStoreSerializer, new DefaultEventSourcingHandlerFactory(DeserializingMessage.defaultParameterResolvers));
    }

    @Override // io.fluxcapacitor.javaclient.modeling.AggregateRepository
    public boolean supports(Class<?> cls) {
        return cls.isAnnotationPresent(EventSourced.class);
    }

    @Override // io.fluxcapacitor.javaclient.modeling.AggregateRepository
    public <T> Aggregate<T> load(String str, Class<T> cls, boolean z) {
        if (z) {
            return (Aggregate) Optional.ofNullable((EventSourcedModel) this.cache.getIfPresent(keyFunction.apply(str))).orElse(null);
        }
        if (this.loadedModels.get() == null) {
            if (!Optional.ofNullable(DeserializingMessage.getCurrent()).map(deserializingMessage -> {
                return Boolean.valueOf(deserializingMessage.getMessageType() == MessageType.COMMAND);
            }).isPresent()) {
                return createAggregate(cls, str);
            }
            this.loadedModels.set(Collections.asLifoQueue(new ArrayDeque()));
            Runnable runnable = () -> {
                Collection<EventSourcedAggregate<?>> collection = this.loadedModels.get();
                this.loadedModels.remove();
                collection.forEach((v0) -> {
                    v0.commit();
                });
            };
            if (((EventSourced) cls.getAnnotation(EventSourced.class)).commitInBatch()) {
                DeserializingMessage.whenBatchCompletes(runnable);
            } else {
                DeserializingMessage.whenMessageCompletes(runnable);
            }
        }
        Collection<EventSourcedAggregate<?>> collection = this.loadedModels.get();
        return (Aggregate) collection.stream().filter(eventSourcedAggregate -> {
            return eventSourcedAggregate.id.equals(str);
        }).map(eventSourcedAggregate2 -> {
            return eventSourcedAggregate2;
        }).findAny().orElseGet(() -> {
            EventSourcedAggregate createAggregate = createAggregate(cls, str);
            collection.add(createAggregate);
            return createAggregate;
        });
    }

    protected <T> EventSourcedAggregate<T> createAggregate(Class<T> cls, String str) {
        return (EventSourcedAggregate) this.aggregateFactory.computeIfAbsent(cls, cls2 -> {
            EventSourcingHandler forType = this.handlerFactory.forType(cls);
            Cache cache = isCached(cls) ? this.cache : NoOpCache.INSTANCE;
            SnapshotRepository snapshotRepository = snapshotRepository(cls);
            SnapshotTrigger snapshotTrigger = snapshotTrigger(cls);
            String domain = domain(cls);
            return str2 -> {
                EventSourcedAggregate eventSourcedAggregate = new EventSourcedAggregate(cls, forType, cache, this.serializer, this.eventStore, snapshotRepository, snapshotTrigger, domain, this.loadedModels.get() == null, str2);
                eventSourcedAggregate.initialize();
                return eventSourcedAggregate;
            };
        }).apply(str);
    }

    protected SnapshotRepository snapshotRepository(Class<?> cls) {
        return ((Integer) Optional.ofNullable((EventSourced) cls.getAnnotation(EventSourced.class)).map((v0) -> {
            return v0.snapshotPeriod();
        }).orElse(Integer.valueOf(((Integer) EventSourced.class.getMethod("snapshotPeriod", new Class[0]).getDefaultValue()).intValue()))).intValue() > 0 ? this.snapshotRepository : NoOpSnapshotRepository.INSTANCE;
    }

    protected SnapshotTrigger snapshotTrigger(Class<?> cls) {
        int intValue = ((Integer) Optional.ofNullable((EventSourced) cls.getAnnotation(EventSourced.class)).map((v0) -> {
            return v0.snapshotPeriod();
        }).orElse(Integer.valueOf(((Integer) EventSourced.class.getMethod("snapshotPeriod", new Class[0]).getDefaultValue()).intValue()))).intValue();
        return intValue > 0 ? new PeriodicSnapshotTrigger(intValue) : NoSnapshotTrigger.INSTANCE;
    }

    protected boolean isCached(Class<?> cls) {
        return ((Boolean) Optional.ofNullable((EventSourced) cls.getAnnotation(EventSourced.class)).map((v0) -> {
            return v0.cached();
        }).orElse(Boolean.valueOf(((Boolean) EventSourced.class.getMethod("cached", new Class[0]).getDefaultValue()).booleanValue()))).booleanValue();
    }

    protected String domain(Class<?> cls) {
        return (String) Optional.ofNullable((EventSourced) cls.getAnnotation(EventSourced.class)).map((v0) -> {
            return v0.domain();
        }).filter(str -> {
            return !str.isEmpty();
        }).orElse(cls.getSimpleName());
    }

    @ConstructorProperties({"eventStore", "snapshotRepository", "cache", "serializer", "handlerFactory"})
    public EventSourcingRepository(EventStore eventStore, SnapshotRepository snapshotRepository, Cache cache, EventStoreSerializer eventStoreSerializer, EventSourcingHandlerFactory eventSourcingHandlerFactory) {
        this.aggregateFactory = new ConcurrentHashMap();
        this.loadedModels = new ThreadLocal<>();
        this.eventStore = eventStore;
        this.snapshotRepository = snapshotRepository;
        this.cache = cache;
        this.serializer = eventStoreSerializer;
        this.handlerFactory = eventSourcingHandlerFactory;
    }
}
