package org.projectnessie.versioned.mongodb;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.mongodb.ConnectionString;
import com.mongodb.MongoClientSettings;
import com.mongodb.WriteConcern;
import com.mongodb.client.model.Filters;
import com.mongodb.client.model.InsertManyOptions;
import com.mongodb.client.model.UpdateOptions;
import com.mongodb.client.result.UpdateResult;
import com.mongodb.reactivestreams.client.MongoClient;
import com.mongodb.reactivestreams.client.MongoClients;
import com.mongodb.reactivestreams.client.MongoCollection;
import com.mongodb.reactivestreams.client.MongoDatabase;
import java.time.Duration;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.bson.BsonBinary;
import org.bson.BsonReader;
import org.bson.BsonWriter;
import org.bson.Document;
import org.bson.codecs.Codec;
import org.bson.codecs.DecoderContext;
import org.bson.codecs.EncoderContext;
import org.bson.codecs.configuration.CodecProvider;
import org.bson.codecs.configuration.CodecRegistries;
import org.bson.codecs.configuration.CodecRegistry;
import org.projectnessie.versioned.impl.EntityStoreHelper;
import org.projectnessie.versioned.impl.condition.ConditionExpression;
import org.projectnessie.versioned.impl.condition.UpdateExpression;
import org.projectnessie.versioned.store.Id;
import org.projectnessie.versioned.store.LoadOp;
import org.projectnessie.versioned.store.LoadStep;
import org.projectnessie.versioned.store.NotFoundException;
import org.projectnessie.versioned.store.SaveOp;
import org.projectnessie.versioned.store.Store;
import org.projectnessie.versioned.store.StoreOperationException;
import org.projectnessie.versioned.store.ValueType;
import org.projectnessie.versioned.tiered.BaseValue;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

/* loaded from: input_file:org/projectnessie/versioned/mongodb/MongoDBStore.class */
public class MongoDBStore implements Store {

    @VisibleForTesting
    static final int LOAD_SIZE = 1000;
    private final MongoStoreConfig config;
    private final MongoClientSettings mongoClientSettings;
    private MongoClient mongoClient;
    private MongoDatabase mongoDatabase;
    private final Duration timeout;
    private Map<ValueType<?>, MongoCollection<Document>> collections = new HashMap();
    static final IdCodec ID_CODEC_INSTANCE = new IdCodec();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/projectnessie/versioned/mongodb/MongoDBStore$CollectionLoadIds.class */
    public static class CollectionLoadIds {
        final ValueType<?> type;
        final MongoCollection<Document> collection;
        final List<Id> ids;

        CollectionLoadIds(ValueType<?> valueType, MongoCollection<Document> mongoCollection, List<Id> list) {
            this.type = valueType;
            this.collection = mongoCollection;
            this.ids = list;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/projectnessie/versioned/mongodb/MongoDBStore$IdCodec.class */
    public static class IdCodec implements Codec<Id> {
        private IdCodec() {
        }

        /* renamed from: decode, reason: merged with bridge method [inline-methods] */
        public Id m2decode(BsonReader bsonReader, DecoderContext decoderContext) {
            return Id.of(bsonReader.readBinaryData().getData());
        }

        public void encode(BsonWriter bsonWriter, Id id, EncoderContext encoderContext) {
            bsonWriter.writeBinaryData(new BsonBinary(id.toBytes()));
        }

        public Class<Id> getEncoderClass() {
            return Id.class;
        }
    }

    public MongoDBStore(MongoStoreConfig mongoStoreConfig) {
        this.config = mongoStoreConfig;
        this.timeout = Duration.ofMillis(mongoStoreConfig.getTimeoutMs());
        this.mongoClientSettings = MongoClientSettings.builder().applyConnectionString(new ConnectionString(mongoStoreConfig.getConnectionString())).codecRegistry(CodecRegistries.fromProviders(new CodecProvider[]{new CodecProvider() { // from class: org.projectnessie.versioned.mongodb.MongoDBStore.1
            public <T> Codec<T> get(Class<T> cls, CodecRegistry codecRegistry) {
                if (cls == Id.class) {
                    return MongoDBStore.ID_CODEC_INSTANCE;
                }
                return null;
            }
        }, MongoClientSettings.getDefaultCodecRegistry()})).writeConcern(WriteConcern.MAJORITY).build();
    }

    public void start() {
        this.mongoClient = MongoClients.create(this.mongoClientSettings);
        this.mongoDatabase = this.mongoClient.getDatabase(this.config.getDatabaseName());
        this.collections = (Map) ValueType.values().stream().collect(ImmutableMap.toImmutableMap(valueType -> {
            return valueType;
        }, valueType2 -> {
            return this.mongoDatabase.getCollection(valueType2.getTableName(this.config.getTablePrefix()));
        }));
        if (this.config.initializeDatabase()) {
            EntityStoreHelper.storeMinimumEntities(this::putIfAbsent);
        }
    }

    public void close() {
        if (null != this.mongoClient) {
            this.mongoClient.close();
        }
    }

    public void load(LoadStep loadStep) throws NotFoundException {
        LoadStep loadStep2 = loadStep;
        while (true) {
            LoadStep loadStep3 = loadStep2;
            if (loadStep3 == null) {
                return;
            }
            Map map = (Map) loadStep3.getOps().collect(Collectors.toMap((v0) -> {
                return v0.getId();
            }, Function.identity()));
            Flux.fromStream(loadStep3.getOps()).groupBy((v0) -> {
                return v0.getValueType();
            }).flatMap(groupedFlux -> {
                ValueType<?> valueType = (ValueType) groupedFlux.key();
                MongoCollection<Document> collection = getCollection(valueType);
                return groupedFlux.map((v0) -> {
                    return v0.getId();
                }).buffer(LOAD_SIZE).map(list -> {
                    return new CollectionLoadIds(valueType, collection, list);
                });
            }).flatMap(collectionLoadIds -> {
                return collectionLoadIds.collection.find(Filters.in("id", collectionLoadIds.ids));
            }).handle((document, synchronousSink) -> {
                LoadOp loadOp = (LoadOp) map.remove(MongoSerDe.deserializeId(document, "id"));
                MongoSerDe.produceToConsumer(document, loadOp.getValueType(), loadOp.getReceiver());
                loadOp.done();
            }).blockLast(this.timeout);
            Collection collection = (Collection) map.values().stream().map(loadOp -> {
                return loadOp.getId().toString();
            }).collect(Collectors.toList());
            if (!collection.isEmpty()) {
                throw new NotFoundException(String.format("Requested object IDs missing: %s", String.join(", ", collection)));
            }
            loadStep2 = (LoadStep) loadStep3.getNext().orElse(null);
        }
    }

    public <C extends BaseValue<C>> boolean putIfAbsent(SaveOp<C> saveOp) {
        UpdateResult updateResult = (UpdateResult) Mono.from(getCollection(saveOp.getType()).updateOne(Filters.eq("id", saveOp.getId()), MongoSerDe.bsonForValueType(saveOp, "$setOnInsert"), new UpdateOptions().upsert(true))).block(this.timeout);
        return (updateResult == null || updateResult.getUpsertedId() == null) ? false : true;
    }

    public <C extends BaseValue<C>> void put(SaveOp<C> saveOp, Optional<ConditionExpression> optional) {
        if (optional.isPresent()) {
            throw new UnsupportedOperationException("ConditionExpressions are not supported with MongoDB yet.");
        }
        UpdateResult updateResult = (UpdateResult) Mono.from(getCollection(saveOp.getType()).updateOne(Filters.eq("id", saveOp.getId()), MongoSerDe.bsonForValueType(saveOp, "$set"), new UpdateOptions().upsert(true))).block(this.timeout);
        if (updateResult == null || (updateResult.getModifiedCount() != 0 && updateResult.getUpsertedId() == null)) {
            throw new StoreOperationException(String.format("Update of %s %s did not succeed", saveOp.getType().name(), saveOp.getId()));
        }
    }

    public <C extends BaseValue<C>> boolean delete(ValueType<C> valueType, Id id, Optional<ConditionExpression> optional) {
        throw new UnsupportedOperationException();
    }

    public void save(List<SaveOp<?>> list) {
        Flux.fromIterable(((Map) list.stream().collect(Collectors.groupingBy((v0) -> {
            return v0.getType();
        }))).entrySet()).flatMap(entry -> {
            return writeCollection((ValueType) entry.getKey()).insertMany((List) entry.getValue(), new InsertManyOptions().ordered(false));
        }).blockLast(this.timeout);
    }

    public <C extends BaseValue<C>> void loadSingle(ValueType<C> valueType, Id id, C c) {
        Document document = (Document) Mono.from(getCollection(valueType).find(Filters.eq("id", id))).block(this.timeout);
        if (null == document) {
            throw new NotFoundException(String.format("Unable to load item with ID: %s", id));
        }
        MongoSerDe.produceToConsumer(document, valueType, c);
    }

    public <C extends BaseValue<C>> boolean update(ValueType<C> valueType, Id id, UpdateExpression updateExpression, Optional<ConditionExpression> optional, Optional<BaseValue<C>> optional2) throws NotFoundException {
        throw new UnsupportedOperationException();
    }

    public <C extends BaseValue<C>> Stream<Store.Acceptor<C>> getValues(ValueType<C> valueType) {
        return Flux.from(getCollection(valueType).find()).toStream().map(document -> {
            return baseValue -> {
                MongoSerDe.produceToConsumer(document, valueType, baseValue);
            };
        });
    }

    @VisibleForTesting
    void resetCollections() {
        Flux.fromIterable(this.collections.values()).flatMap(mongoCollection -> {
            return mongoCollection.deleteMany(Filters.ne("_id", "s"));
        }).blockLast(this.timeout);
    }

    private MongoCollection<Document> writeCollection(ValueType<?> valueType) {
        return getCollection(valueType, new Codec<SaveOp>() { // from class: org.projectnessie.versioned.mongodb.MongoDBStore.2
            /* renamed from: decode, reason: merged with bridge method [inline-methods] */
            public SaveOp m1decode(BsonReader bsonReader, DecoderContext decoderContext) {
                throw new UnsupportedOperationException();
            }

            public void encode(BsonWriter bsonWriter, SaveOp saveOp, EncoderContext encoderContext) {
                MongoSerDe.serializeEntity(bsonWriter, saveOp);
            }

            public Class<SaveOp> getEncoderClass() {
                return SaveOp.class;
            }
        });
    }

    private MongoCollection<Document> getCollection(ValueType<?> valueType, final Codec<?> codec) {
        return getCollection(valueType).withCodecRegistry(new CodecRegistry() { // from class: org.projectnessie.versioned.mongodb.MongoDBStore.3
            public <T> Codec<T> get(Class<T> cls, CodecRegistry codecRegistry) {
                return get(cls);
            }

            public <T> Codec<T> get(Class<T> cls) {
                return cls == Id.class ? MongoDBStore.ID_CODEC_INSTANCE : codec;
            }
        });
    }

    private MongoCollection<Document> getCollection(ValueType<?> valueType) {
        return (MongoCollection) Preconditions.checkNotNull(this.collections.get(valueType), "Unsupported Entity type: %s", valueType.name());
    }
}
