package org.axonframework.extensions.mongo.eventsourcing.eventstore;

import com.mongodb.BasicDBObject;
import com.mongodb.client.FindIterable;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoCursor;
import com.mongodb.client.model.Filters;
import com.mongodb.client.model.FindOneAndReplaceOptions;
import com.mongodb.client.model.IndexOptions;
import com.mongodb.client.model.Sorts;
import java.time.Duration;
import java.time.Instant;
import java.time.temporal.TemporalAmount;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import org.axonframework.common.Assert;
import org.axonframework.common.DateTimeUtils;
import org.axonframework.common.ObjectUtils;
import org.axonframework.eventhandling.DomainEventData;
import org.axonframework.eventhandling.DomainEventMessage;
import org.axonframework.eventhandling.EventMessage;
import org.axonframework.eventhandling.TrackedEventData;
import org.axonframework.eventhandling.TrackingToken;
import org.axonframework.extensions.mongo.eventsourcing.eventstore.documentperevent.EventEntryConfiguration;
import org.axonframework.serialization.SerializedObject;
import org.axonframework.serialization.Serializer;
import org.bson.BsonDocument;
import org.bson.BsonString;
import org.bson.Document;
import org.bson.conversions.Bson;

/* loaded from: input_file:org/axonframework/extensions/mongo/eventsourcing/eventstore/AbstractMongoEventStorageStrategy.class */
public abstract class AbstractMongoEventStorageStrategy implements StorageStrategy {
    protected static final int ORDER_ASC = 1;
    protected static final int ORDER_DESC = -1;
    private final EventEntryConfiguration eventConfiguration;
    private final Duration lookBackTime;

    /* loaded from: input_file:org/axonframework/extensions/mongo/eventsourcing/eventstore/AbstractMongoEventStorageStrategy$TrackedMongoEventEntry.class */
    private static class TrackedMongoEventEntry<T> implements DomainEventData<T>, TrackedEventData<T> {
        private final DomainEventData<T> delegate;
        private final TrackingToken trackingToken;

        public TrackedMongoEventEntry(DomainEventData<T> domainEventData, TrackingToken trackingToken) {
            this.delegate = domainEventData;
            this.trackingToken = trackingToken;
        }

        public String getType() {
            return this.delegate.getType();
        }

        public String getAggregateIdentifier() {
            return this.delegate.getAggregateIdentifier();
        }

        public long getSequenceNumber() {
            return this.delegate.getSequenceNumber();
        }

        public TrackingToken trackingToken() {
            return this.trackingToken;
        }

        public String getEventIdentifier() {
            return this.delegate.getEventIdentifier();
        }

        public Instant getTimestamp() {
            return this.delegate.getTimestamp();
        }

        public SerializedObject<T> getMetaData() {
            return this.delegate.getMetaData();
        }

        public SerializedObject<T> getPayload() {
            return this.delegate.getPayload();
        }
    }

    public AbstractMongoEventStorageStrategy(EventEntryConfiguration eventEntryConfiguration, Duration duration) {
        this.eventConfiguration = (EventEntryConfiguration) ObjectUtils.getOrDefault(eventEntryConfiguration, EventEntryConfiguration.getDefault());
        this.lookBackTime = (Duration) ObjectUtils.getOrDefault(duration, Duration.ofMillis(1000L));
    }

    @Override // org.axonframework.extensions.mongo.eventsourcing.eventstore.StorageStrategy
    public void appendEvents(MongoCollection<Document> mongoCollection, List<? extends EventMessage<?>> list, Serializer serializer) {
        mongoCollection.insertMany((List) createEventDocuments(list, serializer).collect(Collectors.toList()));
    }

    protected abstract Stream<Document> createEventDocuments(List<? extends EventMessage<?>> list, Serializer serializer);

    @Override // org.axonframework.extensions.mongo.eventsourcing.eventstore.StorageStrategy
    public void appendSnapshot(MongoCollection<Document> mongoCollection, DomainEventMessage<?> domainEventMessage, Serializer serializer) {
        mongoCollection.findOneAndReplace(new BsonDocument(this.eventConfiguration.aggregateIdentifierProperty(), new BsonString(domainEventMessage.getAggregateIdentifier())), createSnapshotDocument(domainEventMessage, serializer), new FindOneAndReplaceOptions().upsert(true));
    }

    protected abstract Document createSnapshotDocument(DomainEventMessage<?> domainEventMessage, Serializer serializer);

    @Override // org.axonframework.extensions.mongo.eventsourcing.eventstore.StorageStrategy
    public void deleteSnapshots(MongoCollection<Document> mongoCollection, String str, long j) {
        mongoCollection.deleteMany(Filters.and(new Bson[]{Filters.eq(this.eventConfiguration.aggregateIdentifierProperty(), str), Filters.lt(this.eventConfiguration.sequenceNumberProperty(), Long.valueOf(j))}));
    }

    @Override // org.axonframework.extensions.mongo.eventsourcing.eventstore.StorageStrategy
    public List<? extends DomainEventData<?>> findDomainEvents(MongoCollection<Document> mongoCollection, String str, long j, int i) {
        return (List) StreamSupport.stream(mongoCollection.find(Filters.and(new Bson[]{Filters.eq(this.eventConfiguration.aggregateIdentifierProperty(), str), Filters.gte(this.eventConfiguration.sequenceNumberProperty(), Long.valueOf(j))})).sort(new BasicDBObject(eventConfiguration().sequenceNumberProperty(), Integer.valueOf(ORDER_ASC))).batchSize(i).spliterator(), false).flatMap(this::extractEvents).filter(domainEventData -> {
            return domainEventData.getSequenceNumber() >= j;
        }).collect(Collectors.toList());
    }

    protected abstract Stream<? extends DomainEventData<?>> extractEvents(Document document);

    @Override // org.axonframework.extensions.mongo.eventsourcing.eventstore.StorageStrategy
    public List<? extends TrackedEventData<?>> findTrackedEvents(MongoCollection<Document> mongoCollection, TrackingToken trackingToken, int i) {
        FindIterable find;
        if (trackingToken == null) {
            find = mongoCollection.find();
        } else {
            Assert.isTrue(trackingToken instanceof MongoTrackingToken, () -> {
                return String.format("Token %s is of the wrong type", trackingToken);
            });
            MongoTrackingToken mongoTrackingToken = (MongoTrackingToken) trackingToken;
            find = mongoCollection.find(Filters.and(new Bson[]{Filters.gte(this.eventConfiguration.timestampProperty(), DateTimeUtils.formatInstant(mongoTrackingToken.getTimestamp().minus((TemporalAmount) this.lookBackTime))), Filters.nin(this.eventConfiguration.eventIdentifierProperty(), mongoTrackingToken.getKnownEventIds())}));
        }
        FindIterable batchSize = find.sort(new BasicDBObject(eventConfiguration().timestampProperty(), Integer.valueOf(ORDER_ASC)).append(eventConfiguration().sequenceNumberProperty(), Integer.valueOf(ORDER_ASC))).batchSize(i);
        AtomicReference atomicReference = new AtomicReference((MongoTrackingToken) trackingToken);
        ArrayList arrayList = new ArrayList();
        MongoCursor it = batchSize.iterator();
        while (arrayList.size() < i && it.hasNext()) {
            Stream<R> map = extractEvents((Document) it.next()).filter(domainEventData -> {
                return atomicReference.get() == null || !((MongoTrackingToken) atomicReference.get()).getKnownEventIds().contains(domainEventData.getEventIdentifier());
            }).map(domainEventData2 -> {
                return new TrackedMongoEventEntry(domainEventData2, (TrackingToken) atomicReference.updateAndGet(mongoTrackingToken2 -> {
                    return mongoTrackingToken2 == null ? MongoTrackingToken.of(domainEventData2.getTimestamp(), domainEventData2.getEventIdentifier()) : mongoTrackingToken2.advanceTo(domainEventData2.getTimestamp(), domainEventData2.getEventIdentifier(), this.lookBackTime);
                }));
            });
            arrayList.getClass();
            map.forEach((v1) -> {
                r1.add(v1);
            });
        }
        return arrayList;
    }

    @Override // org.axonframework.extensions.mongo.eventsourcing.eventstore.StorageStrategy
    public Stream<? extends DomainEventData<?>> findSnapshots(MongoCollection<Document> mongoCollection, String str) {
        return StreamSupport.stream(mongoCollection.find(Filters.eq(this.eventConfiguration.aggregateIdentifierProperty(), str)).sort(Sorts.orderBy(new Bson[]{Sorts.descending(new String[]{this.eventConfiguration.sequenceNumberProperty()})})).spliterator(), false).map(this::extractSnapshot);
    }

    @Override // org.axonframework.extensions.mongo.eventsourcing.eventstore.StorageStrategy
    public Optional<Long> lastSequenceNumberFor(MongoCollection<Document> mongoCollection, String str) {
        return Optional.ofNullable((Document) mongoCollection.find(Filters.eq(this.eventConfiguration.aggregateIdentifierProperty(), str)).sort(Sorts.descending(new String[]{this.eventConfiguration.sequenceNumberProperty()})).first()).map(this::extractHighestSequenceNumber);
    }

    @Override // org.axonframework.extensions.mongo.eventsourcing.eventstore.StorageStrategy
    public TrackingToken createTailToken(MongoCollection<Document> mongoCollection) {
        return (TrackingToken) Optional.ofNullable((Document) mongoCollection.find().sort(Sorts.ascending(new String[]{this.eventConfiguration.timestampProperty()})).first()).map(document -> {
            return document.get(this.eventConfiguration.timestampProperty());
        }).map(obj -> {
            return DateTimeUtils.parseInstant((String) obj);
        }).map(instant -> {
            return MongoTrackingToken.of(instant, (Map<String, Long>) Collections.emptyMap());
        }).orElse(null);
    }

    protected abstract DomainEventData<?> extractSnapshot(Document document);

    protected Long extractHighestSequenceNumber(Document document) {
        return (Long) document.get(this.eventConfiguration.sequenceNumberProperty());
    }

    @Override // org.axonframework.extensions.mongo.eventsourcing.eventstore.StorageStrategy
    public void ensureIndexes(MongoCollection<Document> mongoCollection, MongoCollection<Document> mongoCollection2) {
        mongoCollection.createIndex(new BasicDBObject(this.eventConfiguration.aggregateIdentifierProperty(), Integer.valueOf(ORDER_ASC)).append(this.eventConfiguration.sequenceNumberProperty(), Integer.valueOf(ORDER_ASC)), new IndexOptions().unique(true).name("uniqueAggregateIndex"));
        mongoCollection.createIndex(new BasicDBObject(this.eventConfiguration.timestampProperty(), Integer.valueOf(ORDER_ASC)).append(this.eventConfiguration.sequenceNumberProperty(), Integer.valueOf(ORDER_ASC)), new IndexOptions().unique(false).name("orderedEventStreamIndex"));
        mongoCollection2.createIndex(new BasicDBObject(this.eventConfiguration.aggregateIdentifierProperty(), Integer.valueOf(ORDER_ASC)).append(this.eventConfiguration.sequenceNumberProperty(), Integer.valueOf(ORDER_ASC)), new IndexOptions().unique(true).name("uniqueAggregateIndex"));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public EventEntryConfiguration eventConfiguration() {
        return this.eventConfiguration;
    }
}
