package ai.tripl.arc.extract;

import ai.tripl.arc.api.API;
import ai.tripl.arc.util.CloudUtils$;
import ai.tripl.arc.util.log.logger.Logger;
import java.time.Instant;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.HashMap;
import org.apache.hadoop.fs.Path;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalyst.expressions.Literal$;
import org.apache.spark.sql.delta.DeltaLog;
import org.apache.spark.sql.delta.DeltaLog$;
import org.apache.spark.sql.delta.DeltaTableUtils$;
import org.apache.spark.sql.delta.DeltaTimeTravelSpec;
import org.apache.spark.sql.delta.actions.CommitInfo;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Option$;
import scala.Predef$;
import scala.Serializable;
import scala.Some;
import scala.Tuple11;
import scala.Tuple2;
import scala.collection.JavaConverters$;
import scala.collection.MapLike;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.SeqLike;
import scala.collection.immutable.List;
import scala.collection.immutable.List$;
import scala.collection.immutable.Map;
import scala.collection.immutable.Nil$;
import scala.collection.immutable.StringOps;
import scala.math.Ordering$;
import scala.math.Ordering$Long$;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.RichLong$;

/* compiled from: DeltaLakeExtract.scala */
/* loaded from: input_file:ai/tripl/arc/extract/DeltaLakeExtractStage$.class */
public final class DeltaLakeExtractStage$ implements Serializable {
    public static DeltaLakeExtractStage$ MODULE$;

    static {
        new DeltaLakeExtractStage$();
    }

    public Option<Dataset<Row>> execute(DeltaLakeExtractStage deltaLakeExtractStage, SparkSession sparkSession, Logger logger, API.ARCContext aRCContext) {
        CommitInfo commitInfo;
        Dataset dataset;
        Dataset repartition;
        Dataset dataset2;
        Dataset dataset3;
        CloudUtils$.MODULE$.setHadoopConfiguration(deltaLakeExtractStage.authentication(), sparkSession, logger);
        try {
            if (!aRCContext.isStreaming()) {
                DeltaLog forTable = DeltaLog$.MODULE$.forTable(sparkSession, new Path(deltaLakeExtractStage.input()));
                Seq history = forTable.history().getHistory(None$.MODULE$);
                HashMap hashMap = new HashMap();
                deltaLakeExtractStage.timeTravel().foreach(timeTravel -> {
                    $anonfun$execute$1(hashMap, history, timeTravel);
                    return BoxedUnit.UNIT;
                });
                Dataset load = sparkSession.read().format("delta").options(hashMap).load(deltaLakeExtractStage.input());
                Tuple2 tuple2 = new Tuple2(((MapLike) JavaConverters$.MODULE$.mapAsScalaMapConverter(hashMap).asScala()).get("versionAsOf"), ((MapLike) JavaConverters$.MODULE$.mapAsScalaMapConverter(hashMap).asScala()).get("timestampAsOf"));
                if (tuple2 != null) {
                    Option option = (Option) tuple2._1();
                    Option option2 = (Option) tuple2._2();
                    if (None$.MODULE$.equals(option) && None$.MODULE$.equals(option2)) {
                        commitInfo = (CommitInfo) ((SeqLike) ((SeqLike) history.sortBy(commitInfo2 -> {
                            return commitInfo2.version();
                        }, Ordering$.MODULE$.Option(Ordering$Long$.MODULE$))).reverse()).apply(0);
                        CommitInfo commitInfo3 = commitInfo;
                        HashMap hashMap2 = new HashMap();
                        hashMap2.put("version", Long.valueOf(commitInfo3.getVersion()));
                        hashMap2.put("timestamp", Instant.ofEpochMilli(commitInfo3.getTimestamp()).atZone(ZoneId.systemDefault()).format(DateTimeFormatter.ISO_OFFSET_DATE_TIME));
                        deltaLakeExtractStage.stageDetail().put("commit", hashMap2);
                        dataset = load;
                    }
                }
                if (tuple2 != null) {
                    Some some = (Option) tuple2._1();
                    Option option3 = (Option) tuple2._2();
                    if (some instanceof Some) {
                        String str = (String) some.value();
                        if (None$.MODULE$.equals(option3)) {
                            Tuple2 resolveTimeTravelVersion = DeltaTableUtils$.MODULE$.resolveTimeTravelVersion(sparkSession.sessionState().conf(), forTable, new DeltaTimeTravelSpec(None$.MODULE$, new Some(BoxesRunTime.boxToLong(new StringOps(Predef$.MODULE$.augmentString(str)).toLong())), None$.MODULE$));
                            if (resolveTimeTravelVersion == null) {
                                throw new MatchError(resolveTimeTravelVersion);
                            }
                            long _1$mcJ$sp = resolveTimeTravelVersion._1$mcJ$sp();
                            commitInfo = (CommitInfo) ((SeqLike) history.filter(commitInfo4 -> {
                                return BoxesRunTime.boxToBoolean($anonfun$execute$9(_1$mcJ$sp, commitInfo4));
                            })).apply(0);
                            CommitInfo commitInfo32 = commitInfo;
                            HashMap hashMap22 = new HashMap();
                            hashMap22.put("version", Long.valueOf(commitInfo32.getVersion()));
                            hashMap22.put("timestamp", Instant.ofEpochMilli(commitInfo32.getTimestamp()).atZone(ZoneId.systemDefault()).format(DateTimeFormatter.ISO_OFFSET_DATE_TIME));
                            deltaLakeExtractStage.stageDetail().put("commit", hashMap22);
                            dataset = load;
                        }
                    }
                }
                if (tuple2 != null) {
                    Option option4 = (Option) tuple2._1();
                    Some some2 = (Option) tuple2._2();
                    if (None$.MODULE$.equals(option4) && (some2 instanceof Some)) {
                        Tuple2 resolveTimeTravelVersion2 = DeltaTableUtils$.MODULE$.resolveTimeTravelVersion(sparkSession.sessionState().conf(), forTable, new DeltaTimeTravelSpec(new Some(Literal$.MODULE$.apply((String) some2.value())), None$.MODULE$, None$.MODULE$));
                        if (resolveTimeTravelVersion2 == null) {
                            throw new MatchError(resolveTimeTravelVersion2);
                        }
                        long _1$mcJ$sp2 = resolveTimeTravelVersion2._1$mcJ$sp();
                        commitInfo = (CommitInfo) ((SeqLike) history.filter(commitInfo5 -> {
                            return BoxesRunTime.boxToBoolean($anonfun$execute$10(_1$mcJ$sp2, commitInfo5));
                        })).apply(0);
                        CommitInfo commitInfo322 = commitInfo;
                        HashMap hashMap222 = new HashMap();
                        hashMap222.put("version", Long.valueOf(commitInfo322.getVersion()));
                        hashMap222.put("timestamp", Instant.ofEpochMilli(commitInfo322.getTimestamp()).atZone(ZoneId.systemDefault()).format(DateTimeFormatter.ISO_OFFSET_DATE_TIME));
                        deltaLakeExtractStage.stageDetail().put("commit", hashMap222);
                        dataset = load;
                    }
                }
                if (tuple2 != null) {
                    Option option5 = (Option) tuple2._1();
                    Option option6 = (Option) tuple2._2();
                    if ((option5 instanceof Some) && (option6 instanceof Some)) {
                        throw new Exception("invalid state please raise issue.");
                    }
                }
                throw new MatchError(tuple2);
            }
            dataset = sparkSession.readStream().format("delta").load(deltaLakeExtractStage.input());
            Dataset dataset4 = dataset;
            List<String> partitionBy = deltaLakeExtractStage.partitionBy();
            if (Nil$.MODULE$.equals(partitionBy)) {
                Some numPartitions = deltaLakeExtractStage.numPartitions();
                if (numPartitions instanceof Some) {
                    dataset3 = dataset4.repartition(BoxesRunTime.unboxToInt(numPartitions.value()));
                } else {
                    if (!None$.MODULE$.equals(numPartitions)) {
                        throw new MatchError(numPartitions);
                    }
                    dataset3 = dataset4;
                }
                dataset2 = dataset3;
            } else {
                List list = (List) partitionBy.map(str2 -> {
                    return dataset4.apply(str2);
                }, List$.MODULE$.canBuildFrom());
                Some numPartitions2 = deltaLakeExtractStage.numPartitions();
                if (numPartitions2 instanceof Some) {
                    repartition = dataset4.repartition(BoxesRunTime.unboxToInt(numPartitions2.value()), list);
                } else {
                    if (!None$.MODULE$.equals(numPartitions2)) {
                        throw new MatchError(numPartitions2);
                    }
                    repartition = dataset4.repartition(list);
                }
                dataset2 = repartition;
            }
            Dataset dataset5 = dataset2;
            dataset5.createOrReplaceTempView(deltaLakeExtractStage.outputView());
            if (dataset5.isStreaming()) {
                BoxedUnit boxedUnit = BoxedUnit.UNIT;
            } else {
                deltaLakeExtractStage.stageDetail().put("inputFiles", Integer.valueOf(dataset5.inputFiles().length));
                deltaLakeExtractStage.stageDetail().put("outputColumns", Integer.valueOf(dataset5.schema().length()));
                deltaLakeExtractStage.stageDetail().put("numPartitions", Integer.valueOf(dataset5.rdd().partitions().length));
                if (deltaLakeExtractStage.persist()) {
                    dataset5.persist(aRCContext.storageLevel());
                    deltaLakeExtractStage.stageDetail().put("records", Long.valueOf(dataset5.count()));
                } else {
                    BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
                }
            }
            return Option$.MODULE$.apply(dataset5);
        } catch (Exception e) {
            throw new DeltaLakeExtractStage$$anon$1(e, deltaLakeExtractStage);
        }
    }

    public DeltaLakeExtractStage apply(DeltaLakeExtract deltaLakeExtract, String str, Option<String> option, String str2, String str3, Option<API.Authentication> option2, Map<String, String> map, boolean z, Option<Object> option3, List<String> list, Option<TimeTravel> option4) {
        return new DeltaLakeExtractStage(deltaLakeExtract, str, option, str2, str3, option2, map, z, option3, list, option4);
    }

    public Option<Tuple11<DeltaLakeExtract, String, Option<String>, String, String, Option<API.Authentication>, Map<String, String>, Object, Option<Object>, List<String>, Option<TimeTravel>>> unapply(DeltaLakeExtractStage deltaLakeExtractStage) {
        return deltaLakeExtractStage == null ? None$.MODULE$ : new Some(new Tuple11(deltaLakeExtractStage.m1plugin(), deltaLakeExtractStage.name(), deltaLakeExtractStage.description(), deltaLakeExtractStage.input(), deltaLakeExtractStage.outputView(), deltaLakeExtractStage.authentication(), deltaLakeExtractStage.params(), BoxesRunTime.boxToBoolean(deltaLakeExtractStage.persist()), deltaLakeExtractStage.numPartitions(), deltaLakeExtractStage.partitionBy(), deltaLakeExtractStage.timeTravel()));
    }

    private Object readResolve() {
        return MODULE$;
    }

    public static final /* synthetic */ String $anonfun$execute$3(HashMap hashMap, int i) {
        return (String) hashMap.put("versionAsOf", BoxesRunTime.boxToInteger(i).toString());
    }

    public static final /* synthetic */ String $anonfun$execute$4(Seq seq, HashMap hashMap, int i) {
        Seq seq2 = (Seq) seq.map(commitInfo -> {
            return BoxesRunTime.boxToLong(commitInfo.getVersion());
        }, Seq$.MODULE$.canBuildFrom());
        long unboxToLong = BoxesRunTime.unboxToLong(seq2.reduceLeft((j, j2) -> {
            return RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(j), j2);
        }));
        long unboxToLong2 = BoxesRunTime.unboxToLong(seq2.reduceLeft((j3, j4) -> {
            return RichLong$.MODULE$.max$extension(Predef$.MODULE$.longWrapper(j3), j4);
        }));
        long j5 = unboxToLong2 - unboxToLong;
        if (i < j5 * (-1)) {
            throw new Exception(new StringBuilder(74).append("Cannot time travel Delta table to version ").append(i).append(". Available versions: [-").append(j5).append(" ... 0].").toString());
        }
        return (String) hashMap.put("versionAsOf", BoxesRunTime.boxToLong(unboxToLong2 + i).toString());
    }

    public static final /* synthetic */ void $anonfun$execute$1(HashMap hashMap, Seq seq, TimeTravel timeTravel) {
        timeTravel.timestampAsOf().foreach(str -> {
            return (String) hashMap.put("timestampAsOf", str);
        });
        timeTravel.versionAsOf().foreach(obj -> {
            return $anonfun$execute$3(hashMap, BoxesRunTime.unboxToInt(obj));
        });
        timeTravel.relativeVersion().foreach(obj2 -> {
            return $anonfun$execute$4(seq, hashMap, BoxesRunTime.unboxToInt(obj2));
        });
    }

    public static final /* synthetic */ boolean $anonfun$execute$9(long j, CommitInfo commitInfo) {
        return commitInfo.getVersion() == j;
    }

    public static final /* synthetic */ boolean $anonfun$execute$10(long j, CommitInfo commitInfo) {
        return commitInfo.getVersion() == j;
    }

    private DeltaLakeExtractStage$() {
        MODULE$ = this;
    }
}
