package com.dremio.nessie.versioned.store.mongodb;

import com.dremio.nessie.versioned.impl.condition.ConditionExpression;
import com.dremio.nessie.versioned.impl.condition.UpdateExpression;
import com.dremio.nessie.versioned.store.HasId;
import com.dremio.nessie.versioned.store.Id;
import com.dremio.nessie.versioned.store.LoadStep;
import com.dremio.nessie.versioned.store.NotFoundException;
import com.dremio.nessie.versioned.store.SaveOp;
import com.dremio.nessie.versioned.store.Store;
import com.dremio.nessie.versioned.store.ValueType;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ListMultimap;
import com.google.common.collect.Multimaps;
import com.mongodb.ConnectionString;
import com.mongodb.MongoClientSettings;
import com.mongodb.WriteConcern;
import com.mongodb.client.model.Filters;
import com.mongodb.client.model.InsertManyOptions;
import com.mongodb.client.model.ReplaceOptions;
import com.mongodb.client.model.UpdateOptions;
import com.mongodb.client.result.UpdateResult;
import com.mongodb.reactivestreams.client.MongoClient;
import com.mongodb.reactivestreams.client.MongoClients;
import com.mongodb.reactivestreams.client.MongoCollection;
import com.mongodb.reactivestreams.client.MongoDatabase;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Stream;
import org.bson.BsonDocument;
import org.bson.BsonDocumentWriter;
import org.bson.codecs.EncoderContext;
import org.bson.codecs.configuration.CodecRegistries;
import org.bson.codecs.configuration.CodecRegistry;
import org.bson.codecs.pojo.PojoCodecProvider;
import org.bson.conversions.Bson;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

/* loaded from: input_file:com/dremio/nessie/versioned/store/mongodb/MongoDBStore.class */
public class MongoDBStore implements Store {
    private final MongoStoreConfig config;
    private final MongoClientSettings mongoClientSettings;
    private MongoClient mongoClient;
    private MongoDatabase mongoDatabase;
    private final long timeoutMs;
    private Map<ValueType, MongoCollection<? extends HasId>> collections = new HashMap();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/dremio/nessie/versioned/store/mongodb/MongoDBStore$ObservableSubscriber.class */
    public static class ObservableSubscriber<T> implements Subscriber<T> {
        private Throwable error;
        private volatile Subscription subscription;
        private final List<T> received = new ArrayList();
        private final CountDownLatch latch = new CountDownLatch(1);
        private boolean hasRequested = false;

        ObservableSubscriber() {
        }

        public void onSubscribe(Subscription subscription) {
            this.subscription = subscription;
        }

        public void onNext(T t) {
            this.received.add(t);
        }

        public void onError(Throwable th) {
            this.error = th;
            onComplete();
        }

        public void onComplete() {
            this.latch.countDown();
        }

        T first() {
            if (this.received.isEmpty()) {
                return null;
            }
            return this.received.get(0);
        }

        List<T> getReceived() {
            return this.received;
        }

        void request() {
            this.subscription.request(2147483647L);
            this.hasRequested = true;
        }

        ObservableSubscriber<T> await(long j) throws Throwable {
            if (!this.hasRequested) {
                this.subscription.request(2147483647L);
            }
            if (!this.latch.await(j, TimeUnit.MILLISECONDS)) {
                throw new TimeoutException("Publisher onComplete timed out");
            }
            if (null != this.error) {
                throw new ExecutionException(this.error);
            }
            return this;
        }
    }

    /* loaded from: input_file:com/dremio/nessie/versioned/store/mongodb/MongoDBStore$UpdateEntityBson.class */
    private static class UpdateEntityBson<T> implements Bson {
        private final T value;
        private final Class<T> valueClass;

        public <V> UpdateEntityBson(Class<T> cls, T t) {
            this.valueClass = cls;
            this.value = t;
        }

        public <TDocument> BsonDocument toBsonDocument(Class<TDocument> cls, CodecRegistry codecRegistry) {
            BsonDocumentWriter bsonDocumentWriter = new BsonDocumentWriter(new BsonDocument());
            bsonDocumentWriter.writeStartDocument();
            bsonDocumentWriter.writeName("$setOnInsert");
            codecRegistry.get(this.valueClass).encode(bsonDocumentWriter, this.value, EncoderContext.builder().build());
            bsonDocumentWriter.writeEndDocument();
            return bsonDocumentWriter.getDocument();
        }
    }

    public MongoDBStore(MongoStoreConfig mongoStoreConfig) {
        this.config = mongoStoreConfig;
        this.timeoutMs = mongoStoreConfig.getTimeoutMs();
        this.mongoClientSettings = MongoClientSettings.builder().applyConnectionString(new ConnectionString(mongoStoreConfig.getConnectionString())).codecRegistry(CodecRegistries.fromProviders(new org.bson.codecs.configuration.CodecProvider[]{new CodecProvider(), PojoCodecProvider.builder().automatic(true).build(), MongoClientSettings.getDefaultCodecRegistry()})).writeConcern(WriteConcern.MAJORITY).build();
    }

    public void start() {
        this.mongoClient = MongoClients.create(this.mongoClientSettings);
        this.mongoDatabase = this.mongoClient.getDatabase(this.config.getDatabaseName());
        this.collections = (Map) Stream.of((Object[]) ValueType.values()).collect(ImmutableMap.toImmutableMap(valueType -> {
            return valueType;
        }, valueType2 -> {
            return this.mongoDatabase.getCollection(valueType2.getTableName(this.config.getTablePrefix()), valueType2.getObjectClass());
        }));
    }

    public void close() {
        if (null != this.mongoClient) {
            this.mongoClient.close();
        }
    }

    public void load(LoadStep loadStep) throws NotFoundException {
        throw new UnsupportedOperationException();
    }

    public <V> boolean putIfAbsent(ValueType valueType, V v) {
        return ((UpdateResult) await(getCollection(valueType).updateOne(Filters.eq("id", ((HasId) v).getId()), new UpdateEntityBson(valueType.getObjectClass(), v), new UpdateOptions().upsert(true))).first()).getUpsertedId() != null;
    }

    public <V> void put(ValueType valueType, V v, Optional<ConditionExpression> optional) {
        Preconditions.checkArgument(valueType.getObjectClass().isAssignableFrom(v.getClass()), "ValueType %s doesn't extend expected type %s.", v.getClass().getName(), valueType.getObjectClass().getName());
        if (optional.isPresent()) {
            throw new UnsupportedOperationException("ConditionExpressions are not supported with MongoDB yet.");
        }
        await(getCollection(valueType).replaceOne(Filters.eq("id", ((HasId) v).getId()), v, new ReplaceOptions().upsert(true)));
    }

    public boolean delete(ValueType valueType, Id id, Optional<ConditionExpression> optional) {
        throw new UnsupportedOperationException();
    }

    public void save(List<SaveOp<?>> list) {
        ListMultimap transformValues = Multimaps.transformValues(Multimaps.index(list, saveOp -> {
            return this.collections.get(saveOp.getType());
        }), (v0) -> {
            return v0.getValue();
        });
        ArrayList arrayList = new ArrayList();
        for (MongoCollection mongoCollection : transformValues.keySet()) {
            ObservableSubscriber observableSubscriber = new ObservableSubscriber();
            arrayList.add(observableSubscriber);
            mongoCollection.insertMany(transformValues.get(mongoCollection), new InsertManyOptions().ordered(false)).subscribe(observableSubscriber);
            observableSubscriber.request();
        }
        arrayList.forEach(observableSubscriber2 -> {
            try {
                observableSubscriber2.await(this.timeoutMs);
            } catch (Throwable th) {
                Throwables.throwIfUnchecked(th);
                throw new RuntimeException(th);
            }
        });
    }

    public <V> V loadSingle(ValueType valueType, Id id) {
        V v = (V) await(getCollection(valueType).find(Filters.eq("id", id))).first();
        if (null == v) {
            throw new RuntimeException("Unable to load item with ID: " + id);
        }
        return v;
    }

    public <V> Optional<V> update(ValueType valueType, Id id, UpdateExpression updateExpression, Optional<ConditionExpression> optional) throws NotFoundException {
        throw new UnsupportedOperationException();
    }

    public <V> Stream<V> getValues(Class<V> cls, ValueType valueType) {
        return await(getCollection(ValueType.REF).find()).getReceived().stream();
    }

    @VisibleForTesting
    void resetCollections() {
        this.collections.forEach((valueType, mongoCollection) -> {
            await(mongoCollection.deleteMany(Filters.ne("_id", "s")));
        });
    }

    private <T> ObservableSubscriber<T> await(Publisher<T> publisher) {
        try {
            ObservableSubscriber observableSubscriber = new ObservableSubscriber();
            publisher.subscribe(observableSubscriber);
            return observableSubscriber.await(this.timeoutMs);
        } catch (Throwable th) {
            Throwables.throwIfUnchecked(th);
            throw new RuntimeException(th);
        }
    }

    private MongoCollection getCollection(ValueType valueType) {
        MongoCollection<? extends HasId> mongoCollection = this.collections.get(valueType);
        if (null == mongoCollection) {
            throw new UnsupportedOperationException(String.format("Unsupported Entity type: %s", valueType.name()));
        }
        return mongoCollection;
    }
}
