package it.agilelab.darwin.connector.mongo;

import com.mongodb.BasicDBObject;
import com.mongodb.ErrorCategory;
import com.mongodb.MongoWriteException;
import it.agilelab.darwin.common.Connector;
import it.agilelab.darwin.common.Logging;
import it.agilelab.darwin.connector.mongo.ConfigurationMongoModels;
import it.agilelab.darwin.manager.SchemaPayloadPair;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import org.apache.avro.Schema;
import org.bson.BsonDocument;
import org.bson.BsonValue;
import org.mongodb.scala.MongoClient;
import org.mongodb.scala.MongoCollection;
import org.mongodb.scala.bson.BsonInt64$;
import org.mongodb.scala.bson.BsonString$;
import org.mongodb.scala.bson.DefaultHelper$DefaultsTo$;
import org.mongodb.scala.bson.collection.immutable.Document;
import org.mongodb.scala.package$;
import org.slf4j.Logger;
import scala.Function1;
import scala.MatchError;
import scala.Option;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.Tuple2;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.Traversable;
import scala.collection.Traversable$;
import scala.collection.TraversableLike;
import scala.collection.immutable.StringOps;
import scala.concurrent.Await$;
import scala.concurrent.ExecutionContext$Implicits$;
import scala.reflect.ClassTag$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.util.Either;
import scala.util.Failure;
import scala.util.Try;
import scala.util.Try$;

/* compiled from: MongoConnector.scala */
@ScalaSignature(bytes = "\u0006\u0001\u0005\u0015f\u0001B\u0007\u000f\u0001eA\u0001\"\u000b\u0001\u0003\u0002\u0003\u0006IA\u000b\u0005\tg\u0001\u0011\t\u0011)A\u0005i!)\u0001\n\u0001C\u0001\u0013\")a\n\u0001C\u0005\u001f\")\u0011\r\u0001C!E\")a\u000f\u0001C\u0005o\"9\u0011q\f\u0001\u0005B\u0005\u0005\u0004bBA7\u0001\u0011%\u0011q\u000e\u0005\b\u0003\u000b\u0003A\u0011IAD\u0011\u001d\tI\t\u0001C!\u0003\u0017Cq!a%\u0001\t\u0003\n)\nC\u0004\u0002\u0018\u0002!\t%!'\u0003\u001d5{gnZ8D_:tWm\u0019;pe*\u0011q\u0002E\u0001\u0006[>twm\u001c\u0006\u0003#I\t\u0011bY8o]\u0016\u001cGo\u001c:\u000b\u0005M!\u0012A\u00023be^LgN\u0003\u0002\u0016-\u0005A\u0011mZ5mK2\f'MC\u0001\u0018\u0003\tIGo\u0001\u0001\u0014\t\u0001Q\u0002E\n\t\u00037yi\u0011\u0001\b\u0006\u0002;\u0005)1oY1mC&\u0011q\u0004\b\u0002\u0007\u0003:L(+\u001a4\u0011\u0005\u0005\"S\"\u0001\u0012\u000b\u0005\r\u0012\u0012AB2p[6|g.\u0003\u0002&E\tI1i\u001c8oK\u000e$xN\u001d\t\u0003C\u001dJ!\u0001\u000b\u0012\u0003\u000f1{wmZ5oO\u0006YQn\u001c8h_\u000ec\u0017.\u001a8u!\tY\u0013'D\u0001-\u0015\tiRF\u0003\u0002/_\u00059Qn\u001c8h_\u0012\u0014'\"\u0001\u0019\u0002\u0007=\u0014x-\u0003\u00023Y\tYQj\u001c8h_\u000ec\u0017.\u001a8u\u0003-iwN\\4p\u0007>tg-[4\u0011\u0005U*eB\u0001\u001cD\u001d\t9$I\u0004\u00029\u0003:\u0011\u0011\b\u0011\b\u0003u}r!a\u000f \u000e\u0003qR!!\u0010\r\u0002\rq\u0012xn\u001c;?\u0013\u00059\u0012BA\u000b\u0017\u0013\t\u0019B#\u0003\u0002\u0012%%\u0011q\u0002E\u0005\u0003\t:\t\u0001dQ8oM&<WO]1uS>tWj\u001c8h_6{G-\u001a7t\u0013\t1uIA\bCCN,Wj\u001c8h_\u000e{gNZ5h\u0015\t!e\"\u0001\u0004=S:LGO\u0010\u000b\u0004\u00152k\u0005CA&\u0001\u001b\u0005q\u0001\"B\u0015\u0004\u0001\u0004Q\u0003\"B\u001a\u0004\u0001\u0004!\u0014A\u00029beN,'/F\u0001Q!\t\tfL\u0004\u0002S7:\u00111\u000b\u0017\b\u0003)Zs!aO+\n\u0003AJ!aV\u0018\u0002\r\u0005\u0004\u0018m\u00195f\u0013\tI&,\u0001\u0003bmJ|'BA,0\u0013\taV,\u0001\u0004TG\",W.\u0019\u0006\u00033jK!a\u00181\u0003\rA\u000b'o]3s\u0015\taV,\u0001\u0005gk2dGj\\1e)\u0005\u0019\u0007c\u00013jY:\u0011Qm\u001a\b\u0003w\u0019L\u0011!H\u0005\u0003Qr\tq\u0001]1dW\u0006<W-\u0003\u0002kW\n\u00191+Z9\u000b\u0005!d\u0002\u0003B\u000en_JL!A\u001c\u000f\u0003\rQ+\b\u000f\\33!\tY\u0002/\u0003\u0002r9\t!Aj\u001c8h!\t\u0019H/D\u0001^\u0013\t)XL\u0001\u0004TG\",W.Y\u0001\bKb$(/Y2u+\rA\u00181\u0001\u000b\bs\u0006U\u0011QFA!!\rQXp`\u0007\u0002w*\u0011A\u0010H\u0001\u0005kRLG.\u0003\u0002\u007fw\n\u0019AK]=\u0011\t\u0005\u0005\u00111\u0001\u0007\u0001\t\u001d\t)A\u0002b\u0001\u0003\u000f\u0011\u0011!Q\t\u0005\u0003\u0013\ty\u0001E\u0002\u001c\u0003\u0017I1!!\u0004\u001d\u0005\u001dqu\u000e\u001e5j]\u001e\u00042aGA\t\u0013\r\t\u0019\u0002\b\u0002\u0004\u0003:L\bbBA\f\r\u0001\u0007\u0011\u0011D\u0001\u0002IB!\u00111DA\u0014\u001d\u0011\ti\"!\n\u000f\t\u0005}\u00111\u0005\b\u0004)\u0006\u0005\u0012B\u0001\u00180\u0013\tiR&\u0003\u0002iY%!\u0011\u0011FA\u0016\u0005!!unY;nK:$(B\u00015-\u0011\u001d\tyC\u0002a\u0001\u0003c\t\u0011BZ5fY\u0012t\u0015-\\3\u0011\t\u0005M\u00121\b\b\u0005\u0003k\t9\u0004\u0005\u0002<9%\u0019\u0011\u0011\b\u000f\u0002\rA\u0013X\rZ3g\u0013\u0011\ti$a\u0010\u0003\rM#(/\u001b8h\u0015\r\tI\u0004\b\u0005\b\u0003\u00072\u0001\u0019AA#\u0003\u00051\u0007CB\u000e\u0002H\u0005-s0C\u0002\u0002Jq\u0011\u0011BR;oGRLwN\\\u0019\u0011\t\u00055\u0013\u0011\f\b\u0005\u0003\u001f\n)F\u0004\u0003\u0002\u001e\u0005E\u0013bAA*Y\u0005!!m]8o\u0013\rA\u0017q\u000b\u0006\u0004\u0003'b\u0013\u0002BA.\u0003;\u0012\u0011BQ:p]Z\u000bG.^3\u000b\u0007!\f9&\u0001\u0004j]N,'\u000f\u001e\u000b\u0005\u0003G\nI\u0007E\u0002\u001c\u0003KJ1!a\u001a\u001d\u0005\u0011)f.\u001b;\t\r\u0005-t\u00011\u0001d\u0003\u001d\u00198\r[3nCN\f\u0011#\u001b8tKJ$\u0018J\u001a(pi\u0016C\u0018n\u001d;t)\u0019\t\u0019'!\u001d\u0002|!9\u00111\u000f\u0005A\u0002\u0005U\u0014AC2pY2,7\r^5p]B)1&a\u001e\u0002\u001a%\u0019\u0011\u0011\u0010\u0017\u0003\u001f5{gnZ8D_2dWm\u0019;j_:Dq!! \t\u0001\u0004\ty(\u0001\u0005e_\u000e,X.\u001a8u!\u0011\ti%!!\n\t\u0005\r\u0015Q\f\u0002\r\u0005N|g\u000eR8dk6,g\u000e^\u0001\fGJ,\u0017\r^3UC\ndW\r\u0006\u0002\u0002d\u0005YA/\u00192mK\u0016C\u0018n\u001d;t)\t\ti\tE\u0002\u001c\u0003\u001fK1!!%\u001d\u0005\u001d\u0011un\u001c7fC:\f\u0011\u0003^1cY\u0016\u001c%/Z1uS>t\u0007*\u001b8u)\t\t\t$\u0001\u0006gS:$7k\u00195f[\u0006$B!a'\u0002\"B!1$!(s\u0013\r\ty\n\b\u0002\u0007\u001fB$\u0018n\u001c8\t\r\u0005\rF\u00021\u0001p\u0003\tIG\r")
/* loaded from: input_file:it/agilelab/darwin/connector/mongo/MongoConnector.class */
public class MongoConnector implements Connector, Logging {
    private final MongoClient mongoClient;
    private final ConfigurationMongoModels.BaseMongoConfig mongoConfig;
    private Logger it$agilelab$darwin$common$Logging$$_log;
    private volatile boolean bitmap$0;

    public Logger log() {
        return Logging.log$(this);
    }

    public long fingerprint(Schema schema) {
        return Connector.fingerprint$(this, schema);
    }

    public OutputStream writeHeaderToStream(OutputStream outputStream, long j, ByteOrder byteOrder) {
        return Connector.writeHeaderToStream$(this, outputStream, j, byteOrder);
    }

    public byte[] generateAvroSingleObjectEncoded(byte[] bArr, Schema schema, ByteOrder byteOrder, Function1<Schema, Object> function1) {
        return Connector.generateAvroSingleObjectEncoded$(this, bArr, schema, byteOrder, function1);
    }

    public OutputStream generateAvroSingleObjectEncoded(OutputStream outputStream, byte[] bArr, long j, ByteOrder byteOrder) {
        return Connector.generateAvroSingleObjectEncoded$(this, outputStream, bArr, j, byteOrder);
    }

    public OutputStream generateAvroSingleObjectEncoded(OutputStream outputStream, long j, ByteOrder byteOrder, Function1<OutputStream, OutputStream> function1) {
        return Connector.generateAvroSingleObjectEncoded$(this, outputStream, j, byteOrder, function1);
    }

    public Tuple2<Schema, byte[]> retrieveSchemaAndAvroPayload(byte[] bArr, ByteOrder byteOrder, Function1<Object, Option<Schema>> function1) {
        return Connector.retrieveSchemaAndAvroPayload$(this, bArr, byteOrder, function1);
    }

    public Schema retrieveSchemaAndAvroPayload(ByteBuffer byteBuffer, ByteOrder byteOrder, Function1<Object, Option<Schema>> function1) {
        return Connector.retrieveSchemaAndAvroPayload$(this, byteBuffer, byteOrder, function1);
    }

    public Either<byte[], Schema> extractSchema(InputStream inputStream, ByteOrder byteOrder, Function1<Object, Option<Schema>> function1) {
        return Connector.extractSchema$(this, inputStream, byteOrder, function1);
    }

    public Either<Exception, Schema> extractSchema(byte[] bArr, ByteOrder byteOrder, Function1<Object, Option<Schema>> function1) {
        return Connector.extractSchema$(this, bArr, byteOrder, function1);
    }

    public long extractId(byte[] bArr, ByteOrder byteOrder) {
        return Connector.extractId$(this, bArr, byteOrder);
    }

    public Either<byte[], Object> extractId(InputStream inputStream, ByteOrder byteOrder) {
        return Connector.extractId$(this, inputStream, byteOrder);
    }

    public long extractId(ByteBuffer byteBuffer, ByteOrder byteOrder) {
        return Connector.extractId$(this, byteBuffer, byteOrder);
    }

    public SchemaPayloadPair retrieveSchemaAndPayload(byte[] bArr, ByteOrder byteOrder, Function1<Object, Option<Schema>> function1) {
        return Connector.retrieveSchemaAndPayload$(this, bArr, byteOrder, function1);
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v0 */
    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r0v8, types: [it.agilelab.darwin.connector.mongo.MongoConnector] */
    private Logger it$agilelab$darwin$common$Logging$$_log$lzycompute() {
        ?? r0 = this;
        synchronized (r0) {
            if (!this.bitmap$0) {
                this.it$agilelab$darwin$common$Logging$$_log = Logging.it$agilelab$darwin$common$Logging$$_log$(this);
                r0 = this;
                r0.bitmap$0 = true;
            }
        }
        return this.it$agilelab$darwin$common$Logging$$_log;
    }

    public Logger it$agilelab$darwin$common$Logging$$_log() {
        return !this.bitmap$0 ? it$agilelab$darwin$common$Logging$$_log$lzycompute() : this.it$agilelab$darwin$common$Logging$$_log;
    }

    private Schema.Parser parser() {
        return new Schema.Parser();
    }

    public Seq<Tuple2<Object, Schema>> fullLoad() {
        log().debug(new StringBuilder(36).append("loading all schemas from collection ").append(this.mongoConfig.collection()).toString());
        Seq seq = (Seq) Await$.MODULE$.result(package$.MODULE$.ScalaObservable(package$.MODULE$.ScalaObservable(this.mongoClient.getDatabase(this.mongoConfig.database()).getCollection(this.mongoConfig.collection(), DefaultHelper$DefaultsTo$.MODULE$.default(), ClassTag$.MODULE$.apply(Document.class)).find(DefaultHelper$DefaultsTo$.MODULE$.default(), ClassTag$.MODULE$.apply(Document.class))).map(document -> {
            return this.extract(document, "_id", bsonValue -> {
                return BoxesRunTime.boxToLong($anonfun$fullLoad$2(bsonValue));
            }).flatMap(obj -> {
                return $anonfun$fullLoad$3(this, document, BoxesRunTime.unboxToLong(obj));
            });
        })).toFuture(), this.mongoConfig.timeout());
        log().debug(new StringBuilder(20).append(seq.size()).append(" loaded from MongoDB").toString());
        return (Seq) seq.map(r2 -> {
            return (Tuple2) r2.get();
        }, Seq$.MODULE$.canBuildFrom());
    }

    private <A> Try<A> extract(Document document, String str, Function1<BsonValue, A> function1) {
        return (Try) ((TraversableLike) document.filterKeys(str2 -> {
            return BoxesRunTime.boxToBoolean($anonfun$extract$1(str, str2));
        })).headOption().fold(() -> {
            return new Failure(new RuntimeException(new StringBuilder(30).append("Cannot find ").append(str).append(" field in document").toString()));
        }, tuple2 -> {
            if (tuple2 == null) {
                throw new MatchError(tuple2);
            }
            BsonValue bsonValue = (BsonValue) tuple2._2();
            return Try$.MODULE$.apply(() -> {
                return function1.apply(bsonValue);
            }).recoverWith(new MongoConnector$$anonfun$$nestedInanonfun$extract$3$1(null, str));
        });
    }

    public void insert(Seq<Tuple2<Object, Schema>> seq) {
        log().debug(new StringBuilder(43).append("inclusion of new schemas in the collection ").append(this.mongoConfig.collection()).toString());
        seq.foreach(tuple2 -> {
            $anonfun$insert$1(this, tuple2);
            return BoxedUnit.UNIT;
        });
    }

    private void insertIfNotExists(MongoCollection<Document> mongoCollection, BsonDocument bsonDocument) {
        try {
            Await$.MODULE$.result(package$.MODULE$.ScalaSingleObservable(mongoCollection.insertOne(package$.MODULE$.bsonDocumentToDocument(bsonDocument))).toFuture(), this.mongoConfig.timeout());
        } catch (Throwable th) {
            if (th instanceof MongoWriteException) {
                ErrorCategory category = th.getError().getCategory();
                ErrorCategory errorCategory = ErrorCategory.DUPLICATE_KEY;
                if (category != null ? category.equals(errorCategory) : errorCategory == null) {
                    log().info("document already present, doing nothing");
                    BoxedUnit boxedUnit = BoxedUnit.UNIT;
                    return;
                }
            }
            throw th;
        }
    }

    public void createTable() {
        log().debug(new StringBuilder(20).append("Creating collection ").append(this.mongoConfig.collection()).toString());
        try {
            Await$.MODULE$.result(package$.MODULE$.ScalaSingleObservable(this.mongoClient.getDatabase(this.mongoConfig.database()).createCollection(this.mongoConfig.collection())).toFuture(), this.mongoConfig.timeout());
            log().info(new StringBuilder(38).append("collection ").append(this.mongoConfig.collection()).append(" has been correctly created").toString());
        } catch (Exception e) {
            log().info(new StringBuilder(31).append("collection ").append(this.mongoConfig.collection()).append(" was not created. \n ").append(e.getMessage()).toString());
        }
    }

    public boolean tableExists() {
        return BoxesRunTime.unboxToInt(Await$.MODULE$.result(package$.MODULE$.ScalaObservable(package$.MODULE$.ScalaObservable(this.mongoClient.getDatabase(this.mongoConfig.database()).listCollectionNames()).filter(str -> {
            return BoxesRunTime.boxToBoolean($anonfun$tableExists$1(this, str));
        })).toFuture().map(seq -> {
            return BoxesRunTime.boxToInteger(seq.size());
        }, ExecutionContext$Implicits$.MODULE$.global()), this.mongoConfig.timeout())) == 1;
    }

    public String tableCreationHint() {
        return new StringOps(Predef$.MODULE$.augmentString(new StringBuilder(102).append("To create the collection from shell perform the following command:\n       |db.createCollection(").append(this.mongoConfig.collection()).append(")\n     ").toString())).stripMargin();
    }

    public Option<Schema> findSchema(long j) {
        BasicDBObject basicDBObject = new BasicDBObject();
        basicDBObject.put("_id", BsonInt64$.MODULE$.apply(j));
        Option headOption = ((Seq) ((TraversableLike) Await$.MODULE$.result(package$.MODULE$.ScalaObservable(this.mongoClient.getDatabase(this.mongoConfig.database()).getCollection(this.mongoConfig.collection(), DefaultHelper$DefaultsTo$.MODULE$.default(), ClassTag$.MODULE$.apply(Document.class)).find(basicDBObject, DefaultHelper$DefaultsTo$.MODULE$.default(), ClassTag$.MODULE$.apply(Document.class))).toFuture(), this.mongoConfig.timeout())).flatMap(document -> {
            return (Traversable) document.withFilter(tuple2 -> {
                return BoxesRunTime.boxToBoolean($anonfun$findSchema$2(tuple2));
            }).map(tuple22 -> {
                return ((BsonValue) tuple22._2()).asString().getValue();
            }, Traversable$.MODULE$.canBuildFrom());
        }, Seq$.MODULE$.canBuildFrom())).headOption();
        Schema.Parser parser = parser();
        return headOption.map(str -> {
            return parser.parse(str);
        });
    }

    public static final /* synthetic */ long $anonfun$fullLoad$2(BsonValue bsonValue) {
        return bsonValue.asInt64().getValue();
    }

    public static final /* synthetic */ Try $anonfun$fullLoad$3(MongoConnector mongoConnector, Document document, long j) {
        return mongoConnector.extract(document, "schema", bsonValue -> {
            return bsonValue.asString().getValue();
        }).flatMap(str -> {
            return Try$.MODULE$.apply(() -> {
                return mongoConnector.parser().parse(str);
            }).map(schema -> {
                return Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(BoxesRunTime.boxToLong(j)), schema);
            });
        });
    }

    public static final /* synthetic */ boolean $anonfun$extract$1(String str, String str2) {
        return str2 != null ? str2.equals(str) : str == null;
    }

    public static final /* synthetic */ void $anonfun$insert$1(MongoConnector mongoConnector, Tuple2 tuple2) {
        if (tuple2 == null) {
            throw new MatchError(tuple2);
        }
        long _1$mcJ$sp = tuple2._1$mcJ$sp();
        Schema schema = (Schema) tuple2._2();
        BsonDocument bsonDocument = new BsonDocument();
        bsonDocument.put("_id", BsonInt64$.MODULE$.apply(_1$mcJ$sp));
        bsonDocument.put("schema", BsonString$.MODULE$.apply(schema.toString()));
        bsonDocument.put("name", BsonString$.MODULE$.apply(schema.getName()));
        bsonDocument.put("namespace", BsonString$.MODULE$.apply(schema.getNamespace()));
        mongoConnector.insertIfNotExists(mongoConnector.mongoClient.getDatabase(mongoConnector.mongoConfig.database()).getCollection(mongoConnector.mongoConfig.collection(), DefaultHelper$DefaultsTo$.MODULE$.default(), ClassTag$.MODULE$.apply(Document.class)), bsonDocument);
        BoxedUnit boxedUnit = BoxedUnit.UNIT;
    }

    public static final /* synthetic */ boolean $anonfun$tableExists$1(MongoConnector mongoConnector, String str) {
        String collection = mongoConnector.mongoConfig.collection();
        return str != null ? str.equals(collection) : collection == null;
    }

    public static final /* synthetic */ boolean $anonfun$findSchema$2(Tuple2 tuple2) {
        Object _1 = tuple2._1();
        return _1 != null ? _1.equals("schema") : "schema" == 0;
    }

    public MongoConnector(MongoClient mongoClient, ConfigurationMongoModels.BaseMongoConfig baseMongoConfig) {
        this.mongoClient = mongoClient;
        this.mongoConfig = baseMongoConfig;
        Connector.$init$(this);
        Logging.$init$(this);
    }
}
