package io.qbeast.spark.delta.writer;

import io.qbeast.core.model.CubeId;
import io.qbeast.core.model.DataWriter;
import io.qbeast.core.model.IndexStatus;
import io.qbeast.core.model.QTableID;
import io.qbeast.core.model.QbeastBlock;
import io.qbeast.core.model.TableChanges;
import io.qbeast.spark.index.QbeastColumns$;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
import org.apache.spark.qbeast.config.package$;
import org.apache.spark.rdd.RDD;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.SparkSession$;
import org.apache.spark.sql.delta.actions.AddFile;
import org.apache.spark.sql.delta.actions.FileAction;
import org.apache.spark.sql.execution.datasources.OutputWriterFactory;
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat;
import org.apache.spark.sql.functions$;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.util.SerializableConfiguration;
import scala.MatchError;
import scala.Predef$;
import scala.Tuple2;
import scala.collection.TraversableLike;
import scala.collection.TraversableOnce;
import scala.collection.immutable.Nil$;
import scala.collection.immutable.Seq;
import scala.collection.immutable.Seq$;
import scala.collection.mutable.ArrayOps;
import scala.collection.mutable.Builder;
import scala.collection.parallel.immutable.ParVector;
import scala.collection.parallel.immutable.ParVector$;
import scala.reflect.ClassTag$;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.LongRef;

/* compiled from: SparkDeltaDataWriter.scala */
/* loaded from: input_file:io/qbeast/spark/delta/writer/SparkDeltaDataWriter$.class */
public final class SparkDeltaDataWriter$ implements DataWriter<Dataset<Row>, StructType, FileAction> {
    public static SparkDeltaDataWriter$ MODULE$;

    static {
        new SparkDeltaDataWriter$();
    }

    public Seq<FileAction> write(QTableID qTableID, StructType structType, Dataset<Row> dataset, TableChanges tableChanges) {
        SparkSession sparkSession = dataset.sparkSession();
        Job job = Job.getInstance();
        BlockWriter blockWriter = new BlockWriter(qTableID.id(), structType, dataset.schema(), new ParquetFileFormat().prepareWrite(sparkSession, job, Predef$.MODULE$.Map().empty(), structType), new SerializableConfiguration(job.getConfiguration()), QbeastColumns$.MODULE$.apply(dataset), tableChanges);
        Predef$ predef$ = Predef$.MODULE$;
        RDD execute = dataset.repartition(Predef$.MODULE$.wrapRefArray(new Column[]{functions$.MODULE$.col(QbeastColumns$.MODULE$.cubeColumnName())})).queryExecution().executedPlan().execute();
        return new ArrayOps.ofRef(predef$.refArrayOps((Object[]) execute.mapPartitions(iterator -> {
            return blockWriter.writeRow(iterator);
        }, execute.mapPartitions$default$2(), ClassTag$.MODULE$.apply(AddFile.class)).collect())).toIndexedSeq();
    }

    public Seq<Tuple2<CubeId, Seq<QbeastBlock>>> groupFilesToCompact(Seq<Tuple2<CubeId, Seq<QbeastBlock>>> seq) {
        return (Seq) ((Seq) ((TraversableLike) seq.map(tuple2 -> {
            if (tuple2 != null) {
                return new Tuple2((CubeId) tuple2._1(), ((Seq) tuple2._2()).filter(qbeastBlock -> {
                    return BoxesRunTime.boxToBoolean($anonfun$groupFilesToCompact$2(qbeastBlock));
                }));
            }
            throw new MatchError(tuple2);
        }, Seq$.MODULE$.canBuildFrom())).filter(tuple22 -> {
            return BoxesRunTime.boxToBoolean($anonfun$groupFilesToCompact$3(tuple22));
        })).flatMap(tuple23 -> {
            if (tuple23 == null) {
                throw new MatchError(tuple23);
            }
            CubeId cubeId = (CubeId) tuple23._1();
            Seq seq2 = (Seq) tuple23._2();
            Builder newBuilder = scala.collection.Seq$.MODULE$.newBuilder();
            Builder newBuilder2 = scala.collection.Seq$.MODULE$.newBuilder();
            LongRef create = LongRef.create(0L);
            seq2.foreach(qbeastBlock -> {
                $anonfun$groupFilesToCompact$5(create, newBuilder, newBuilder2, qbeastBlock);
                return BoxedUnit.UNIT;
            });
            newBuilder.$plus$eq(newBuilder2.result());
            return (scala.collection.Seq) ((TraversableLike) newBuilder.result()).map(seq3 -> {
                return new Tuple2(cubeId, seq3.toIndexedSeq());
            }, scala.collection.Seq$.MODULE$.canBuildFrom());
        }, Seq$.MODULE$.canBuildFrom());
    }

    public Seq<FileAction> compact(QTableID qTableID, StructType structType, IndexStatus indexStatus, TableChanges tableChanges) {
        SparkSession active = SparkSession$.MODULE$.active();
        ParVector parVector = new ParVector(groupFilesToCompact(indexStatus.cubesStatuses().mapValues(cubeStatus -> {
            return cubeStatus.files();
        }).toIndexedSeq()).toVector());
        Job job = Job.getInstance();
        OutputWriterFactory prepareWrite = new ParquetFileFormat().prepareWrite(active, job, Predef$.MODULE$.Map().empty(), structType);
        SerializableConfiguration serializableConfiguration = new SerializableConfiguration(job.getConfiguration());
        return ((ParVector) parVector.flatMap(tuple2 -> {
            scala.collection.Seq indexedSeq;
            if (tuple2 != null) {
                CubeId cubeId = (CubeId) tuple2._1();
                Seq seq = (Seq) tuple2._2();
                if (cubeId != null) {
                    if (seq.size() <= 1) {
                        indexedSeq = (scala.collection.Seq) Nil$.MODULE$;
                    } else {
                        Seq seq2 = (Seq) seq.map(qbeastBlock -> {
                            return new Path(qTableID.id(), qbeastBlock.path()).toString();
                        }, Seq$.MODULE$.canBuildFrom());
                        Compactor compactor = new Compactor(qTableID, prepareWrite, serializableConfiguration, structType, cubeId, seq.toIndexedSeq(), tableChanges);
                        Predef$ predef$ = Predef$.MODULE$;
                        RDD execute = active.read().format("parquet").load(seq2).repartition(1).queryExecution().executedPlan().execute();
                        indexedSeq = new ArrayOps.ofRef(predef$.refArrayOps((Object[]) execute.mapPartitions(iterator -> {
                            return compactor.writeBlock(iterator);
                        }, execute.mapPartitions$default$2(), ClassTag$.MODULE$.apply(FileAction.class)).collect())).toIndexedSeq();
                    }
                    return indexedSeq;
                }
            }
            throw new MatchError(tuple2);
        }, ParVector$.MODULE$.canBuildFrom())).seq();
    }

    public static final /* synthetic */ boolean $anonfun$groupFilesToCompact$2(QbeastBlock qbeastBlock) {
        return qbeastBlock.size() >= ((long) package$.MODULE$.MIN_FILE_SIZE_COMPACTION());
    }

    public static final /* synthetic */ boolean $anonfun$groupFilesToCompact$3(Tuple2 tuple2) {
        return ((TraversableOnce) tuple2._2()).nonEmpty();
    }

    public static final /* synthetic */ void $anonfun$groupFilesToCompact$5(LongRef longRef, Builder builder, Builder builder2, QbeastBlock qbeastBlock) {
        if (qbeastBlock.size() + longRef.elem > package$.MODULE$.MAX_FILE_SIZE_COMPACTION()) {
            builder.$plus$eq(builder2.result());
            builder2.clear();
            longRef.elem = 0L;
        }
        builder2.$plus$eq(qbeastBlock);
        longRef.elem += qbeastBlock.size();
    }

    private SparkDeltaDataWriter$() {
        MODULE$ = this;
    }
}
