package it.agilelab.bigdata.wasp.consumers.spark.plugins.cdc;

import io.delta.tables.DeltaTable;
import it.agilelab.bigdata.wasp.core.logging.Logging;
import it.agilelab.bigdata.wasp.core.logging.WaspLogger;
import it.agilelab.bigdata.wasp.models.CdcModel;
import it.agilelab.bigdata.wasp.models.GenericCdcMutationFields$;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.functions$;
import scala.Array$;
import scala.Predef$;
import scala.StringContext;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.SeqLike;
import scala.collection.immutable.Nil$;
import scala.collection.mutable.StringBuilder;
import scala.reflect.ClassTag$;
import scala.reflect.ScalaSignature;

/* compiled from: DeltaLakeWriter.scala */
@ScalaSignature(bytes = "\u0006\u0001U4A!\u0001\u0002\u0001'\tyA)\u001a7uC2\u000b7.Z,sSR,'O\u0003\u0002\u0004\t\u0005\u00191\rZ2\u000b\u0005\u00151\u0011a\u00029mk\u001eLgn\u001d\u0006\u0003\u000f!\tQa\u001d9be.T!!\u0003\u0006\u0002\u0013\r|gn];nKJ\u001c(BA\u0006\r\u0003\u00119\u0018m\u001d9\u000b\u00055q\u0011a\u00022jO\u0012\fG/\u0019\u0006\u0003\u001fA\t\u0001\"Y4jY\u0016d\u0017M\u0019\u0006\u0002#\u0005\u0011\u0011\u000e^\u0002\u0001'\u0011\u0001AC\u0007\u0010\u0011\u0005UAR\"\u0001\f\u000b\u0003]\tQa]2bY\u0006L!!\u0007\f\u0003\r\u0005s\u0017PU3g!\tYB$D\u0001\u0003\u0013\ti\"A\u0001\u0004Xe&$XM\u001d\t\u0003?\u0011j\u0011\u0001\t\u0006\u0003C\t\nq\u0001\\8hO&twM\u0003\u0002$\u0015\u0005!1m\u001c:f\u0013\t)\u0003EA\u0004M_\u001e<\u0017N\\4\t\u0011\u001d\u0002!\u0011!Q\u0001\n!\nQ!\\8eK2\u0004\"!\u000b\u0017\u000e\u0003)R!a\u000b\u0006\u0002\r5|G-\u001a7t\u0013\ti#F\u0001\u0005DI\u000elu\u000eZ3m\u0011!y\u0003A!A!\u0002\u0013\u0001\u0014AA:t!\t\t\u0014(D\u00013\u0015\t\u0019D'A\u0002tc2T!aB\u001b\u000b\u0005Y:\u0014AB1qC\u000eDWMC\u00019\u0003\ry'oZ\u0005\u0003uI\u0012Ab\u00159be.\u001cVm]:j_:DQ\u0001\u0010\u0001\u0005\u0002u\na\u0001P5oSRtDc\u0001 @\u0001B\u00111\u0004\u0001\u0005\u0006Om\u0002\r\u0001\u000b\u0005\u0006_m\u0002\r\u0001\r\u0005\u0006\u0005\u0002!\taQ\u0001\u0006oJLG/\u001a\u000b\u0004\t\u001e[\u0006CA\u000bF\u0013\t1eC\u0001\u0003V]&$\b\"\u0002%B\u0001\u0004I\u0015A\u00013g!\tQ\u0005L\u0004\u0002L-:\u0011A*\u0016\b\u0003\u001bRs!AT*\u000f\u0005=\u0013V\"\u0001)\u000b\u0005E\u0013\u0012A\u0002\u001fs_>$h(C\u00019\u0013\t1t'\u0003\u0002\bk%\u00111\u0007N\u0005\u0003/J\nq\u0001]1dW\u0006<W-\u0003\u0002Z5\nIA)\u0019;b\rJ\fW.\u001a\u0006\u0003/JBQ\u0001X!A\u0002u\u000b!!\u001b3\u0011\u0005Uq\u0016BA0\u0017\u0005\u0011auN\\4\t\u000b\u0005\u0004A\u0011\u00012\u0002+\u001d,G\u000fT1uKN$8\t[1oO\u00164uN]&fsR\u0019\u0011j\u00193\t\u000b!\u0003\u0007\u0019A%\t\u000b\u0015\u0004\u0007\u0019\u00014\u0002\u0013-,\u0017PR5fY\u0012\u001c\bcA4l]:\u0011\u0001N\u001b\b\u0003\u001f&L\u0011aF\u0005\u0003/ZI!\u0001\\7\u0003\u0007M+\u0017O\u0003\u0002X-A\u0011qN\u001d\b\u0003+AL!!\u001d\f\u0002\rA\u0013X\rZ3g\u0013\t\u0019HO\u0001\u0004TiJLgn\u001a\u0006\u0003cZ\u0001")
/* loaded from: input_file:it/agilelab/bigdata/wasp/consumers/spark/plugins/cdc/DeltaLakeWriter.class */
public class DeltaLakeWriter implements Writer, Logging {
    private final CdcModel model;
    private final SparkSession ss;
    private final WaspLogger logger;

    public WaspLogger logger() {
        return this.logger;
    }

    public void it$agilelab$bigdata$wasp$core$logging$Logging$_setter_$logger_$eq(WaspLogger waspLogger) {
        this.logger = waspLogger;
    }

    @Override // it.agilelab.bigdata.wasp.consumers.spark.plugins.cdc.Writer
    public void write(Dataset<Row> dataset, long j) {
        String[] columns = dataset.selectExpr(Predef$.MODULE$.wrapRefArray(new String[]{"key.*"})).columns();
        DeltaTable deltaTable = DeltaLakeOperations$.MODULE$.getDeltaTable(this.model.uri(), dataset.selectExpr(Predef$.MODULE$.wrapRefArray(new String[]{new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"value.", ".*"})).s(Predef$.MODULE$.genericWrapArray(new Object[]{GenericCdcMutationFields$.MODULE$.AFTER_IMAGE()}))})).schema(), this.ss);
        Dataset persist = getLatestChangeForKey(dataset, Predef$.MODULE$.wrapRefArray(columns)).persist();
        try {
            String mkString = Predef$.MODULE$.refArrayOps((Object[]) Predef$.MODULE$.refArrayOps(columns).map(new DeltaLakeWriter$$anonfun$1(this), Array$.MODULE$.canBuildFrom(ClassTag$.MODULE$.apply(String.class)))).mkString(" AND ");
            String[] columns2 = persist.selectExpr(Predef$.MODULE$.wrapRefArray(new String[]{"_value.*"})).drop(Predef$.MODULE$.wrapRefArray(columns)).columns();
            deltaTable.as("table").merge(persist.selectExpr(Predef$.MODULE$.wrapRefArray((Object[]) Predef$.MODULE$.refArrayOps((Object[]) Predef$.MODULE$.refArrayOps(columns2).map(new DeltaLakeWriter$$anonfun$4(this), Array$.MODULE$.canBuildFrom(ClassTag$.MODULE$.apply(String.class)))).$plus$colon("key.*", ClassTag$.MODULE$.apply(String.class)))).as("mutation"), mkString).whenMatched(Predef$.MODULE$.refArrayOps((Object[]) Predef$.MODULE$.refArrayOps(columns2).map(new DeltaLakeWriter$$anonfun$2(this), Array$.MODULE$.canBuildFrom(ClassTag$.MODULE$.apply(String.class)))).mkString(" AND ")).delete().whenMatched().updateAll().whenNotMatched(Predef$.MODULE$.refArrayOps((Object[]) Predef$.MODULE$.refArrayOps(columns2).map(new DeltaLakeWriter$$anonfun$3(this), Array$.MODULE$.canBuildFrom(ClassTag$.MODULE$.apply(String.class)))).mkString(" OR ")).insertAll().execute();
        } finally {
            persist.unpersist();
        }
    }

    public Dataset<Row> getLatestChangeForKey(Dataset<Row> dataset, Seq<String> seq) {
        return dataset.selectExpr((Seq) ((SeqLike) seq.map(new DeltaLakeWriter$$anonfun$getLatestChangeForKey$1(this), Seq$.MODULE$.canBuildFrom())).$colon$plus(new StringBuilder().append(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"struct("})).s(Nil$.MODULE$)).append(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"value.", ", "})).s(Predef$.MODULE$.genericWrapArray(new Object[]{GenericCdcMutationFields$.MODULE$.TIMESTAMP()}))).append(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"value.", ", "})).s(Predef$.MODULE$.genericWrapArray(new Object[]{GenericCdcMutationFields$.MODULE$.COMMIT_ID()}))).append(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"value.", ", "})).s(Predef$.MODULE$.genericWrapArray(new Object[]{GenericCdcMutationFields$.MODULE$.AFTER_IMAGE()}))).append(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"value.", ", "})).s(Predef$.MODULE$.genericWrapArray(new Object[]{GenericCdcMutationFields$.MODULE$.BEFORE_IMAGE()}))).append(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"value.", ") as otherCols"})).s(Predef$.MODULE$.genericWrapArray(new Object[]{GenericCdcMutationFields$.MODULE$.TYPE()}))).toString(), Seq$.MODULE$.canBuildFrom())).groupBy((Seq) seq.map(new DeltaLakeWriter$$anonfun$getLatestChangeForKey$2(this), Seq$.MODULE$.canBuildFrom())).agg(functions$.MODULE$.max("otherCols").as("latest"), Predef$.MODULE$.wrapRefArray(new Column[0])).withColumn("key", functions$.MODULE$.struct((Seq) seq.map(new DeltaLakeWriter$$anonfun$5(this), Seq$.MODULE$.canBuildFrom()))).selectExpr(Seq$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new String[]{new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"latest.", " as _timestamp"})).s(Predef$.MODULE$.genericWrapArray(new Object[]{GenericCdcMutationFields$.MODULE$.TIMESTAMP()})), new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"latest.", " as _commitId"})).s(Predef$.MODULE$.genericWrapArray(new Object[]{GenericCdcMutationFields$.MODULE$.COMMIT_ID()})), new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"latest.", " as _type"})).s(Predef$.MODULE$.genericWrapArray(new Object[]{GenericCdcMutationFields$.MODULE$.TYPE()})), new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"latest.", " as _value"})).s(Predef$.MODULE$.genericWrapArray(new Object[]{GenericCdcMutationFields$.MODULE$.AFTER_IMAGE()})), "key"})));
    }

    public DeltaLakeWriter(CdcModel cdcModel, SparkSession sparkSession) {
        this.model = cdcModel;
        this.ss = sparkSession;
        Logging.class.$init$(this);
    }
}
