package it.agilelab.bigdata.wasp.consumers.spark.streaming.actor.etl;

import it.agilelab.bigdata.wasp.consumers.spark.metadata.Metadata;
import it.agilelab.bigdata.wasp.consumers.spark.metadata.Path;
import it.agilelab.bigdata.wasp.consumers.spark.metadata.Path$;
import it.agilelab.bigdata.wasp.consumers.spark.utils.RowEncoderUtils$;
import java.time.Clock;
import java.time.Instant;
import java.time.format.DateTimeFormatter;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.expressions.UserDefinedFunction;
import org.apache.spark.sql.functions$;
import scala.Function5;
import scala.Predef$;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.SeqLike;
import scala.collection.TraversableOnce;
import scala.collection.immutable.$colon;
import scala.collection.immutable.Nil$;
import scala.collection.mutable.ArrayOps;
import scala.reflect.ClassTag$;
import scala.reflect.api.Mirror;
import scala.reflect.api.TypeCreator;
import scala.reflect.api.TypeTags;
import scala.reflect.api.Types;
import scala.reflect.api.Universe;
import scala.reflect.runtime.package$;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.IntRef;

/* compiled from: ActivationSteps.scala */
/* loaded from: input_file:it/agilelab/bigdata/wasp/consumers/spark/streaming/actor/etl/MetadataOps$.class */
public final class MetadataOps$ {
    public static MetadataOps$ MODULE$;

    static {
        new MetadataOps$();
    }

    public Dataset<Row> enter(String str, Dataset<Row> dataset) {
        return on(new StringBuilder(6).append(str.replace(' ', '-')).append("-enter").toString(), dataset);
    }

    public Dataset<Row> exit(String str, Dataset<Row> dataset) {
        return on(new StringBuilder(5).append(str.replace(' ', '-')).append("-exit").toString(), dataset);
    }

    private Dataset<Row> on(String str, Dataset<Row> dataset) {
        if (!new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps(dataset.columns())).contains("metadata")) {
            return dataset;
        }
        String[] columns = dataset.columns();
        return dataset.withColumn("metadata_new", updateMetadata(str).apply(Predef$.MODULE$.wrapRefArray(new Column[]{functions$.MODULE$.col("metadata.id"), functions$.MODULE$.col("metadata.sourceId"), functions$.MODULE$.col("metadata.arrivalTimestamp"), functions$.MODULE$.col("metadata.lastSeenTimestamp"), functions$.MODULE$.col("metadata.path")}))).drop("metadata").withColumnRenamed("metadata_new", "metadata").select((String) new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps(columns)).head(), Predef$.MODULE$.wrapRefArray((Object[]) new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps(columns)).tail()));
    }

    public UserDefinedFunction updateMetadata(String str) {
        functions$ functions_ = functions$.MODULE$;
        Function5 function5 = (str2, str3, obj, obj2, seq) -> {
            return $anonfun$updateMetadata$1(str, str2, str3, BoxesRunTime.unboxToLong(obj), BoxesRunTime.unboxToLong(obj2), seq);
        };
        TypeTags universe = package$.MODULE$.universe();
        TypeTags.TypeTag apply = universe.TypeTag().apply(package$.MODULE$.universe().runtimeMirror(getClass().getClassLoader()), new TypeCreator() { // from class: it.agilelab.bigdata.wasp.consumers.spark.streaming.actor.etl.MetadataOps$$typecreator1$1
            public <U extends Universe> Types.TypeApi apply(Mirror<U> mirror) {
                mirror.universe();
                return mirror.staticClass("it.agilelab.bigdata.wasp.consumers.spark.metadata.Metadata").asType().toTypeConstructor();
            }
        });
        TypeTags universe2 = package$.MODULE$.universe();
        TypeTags.TypeTag apply2 = universe2.TypeTag().apply(package$.MODULE$.universe().runtimeMirror(getClass().getClassLoader()), new TypeCreator() { // from class: it.agilelab.bigdata.wasp.consumers.spark.streaming.actor.etl.MetadataOps$$typecreator2$1
            public <U extends Universe> Types.TypeApi apply(Mirror<U> mirror) {
                Universe universe3 = mirror.universe();
                return universe3.internal().reificationSupport().TypeRef(universe3.internal().reificationSupport().SingleType(mirror.staticPackage("scala").asModule().moduleClass().asType().toTypeConstructor(), mirror.staticModule("scala.Predef")), universe3.internal().reificationSupport().selectType(mirror.staticModule("scala.Predef").asModule().moduleClass(), "String"), Nil$.MODULE$);
            }
        });
        TypeTags universe3 = package$.MODULE$.universe();
        TypeTags.TypeTag apply3 = universe3.TypeTag().apply(package$.MODULE$.universe().runtimeMirror(getClass().getClassLoader()), new TypeCreator() { // from class: it.agilelab.bigdata.wasp.consumers.spark.streaming.actor.etl.MetadataOps$$typecreator3$1
            public <U extends Universe> Types.TypeApi apply(Mirror<U> mirror) {
                Universe universe4 = mirror.universe();
                return universe4.internal().reificationSupport().TypeRef(universe4.internal().reificationSupport().SingleType(mirror.staticPackage("scala").asModule().moduleClass().asType().toTypeConstructor(), mirror.staticModule("scala.Predef")), universe4.internal().reificationSupport().selectType(mirror.staticModule("scala.Predef").asModule().moduleClass(), "String"), Nil$.MODULE$);
            }
        });
        TypeTags.TypeTag Long = package$.MODULE$.universe().TypeTag().Long();
        TypeTags.TypeTag Long2 = package$.MODULE$.universe().TypeTag().Long();
        TypeTags universe4 = package$.MODULE$.universe();
        return functions_.udf(function5, apply, apply2, apply3, Long, Long2, universe4.TypeTag().apply(package$.MODULE$.universe().runtimeMirror(getClass().getClassLoader()), new TypeCreator() { // from class: it.agilelab.bigdata.wasp.consumers.spark.streaming.actor.etl.MetadataOps$$typecreator4$1
            public <U extends Universe> Types.TypeApi apply(Mirror<U> mirror) {
                Universe universe5 = mirror.universe();
                return universe5.internal().reificationSupport().TypeRef(universe5.internal().reificationSupport().SingleType(universe5.internal().reificationSupport().SingleType(universe5.internal().reificationSupport().thisPrefix(mirror.RootClass()), mirror.staticPackage("scala")), mirror.staticModule("scala.package")), universe5.internal().reificationSupport().selectType(mirror.staticModule("scala.package").asModule().moduleClass(), "Seq"), new $colon.colon(mirror.staticClass("org.apache.spark.sql.Row").asType().toTypeConstructor(), Nil$.MODULE$));
            }
        }));
    }

    public Dataset<Row> sendLatencyMessage(Dataset<Row> dataset, TelemetryMetadataProducerConfig telemetryMetadataProducerConfig, int i) {
        if (!new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps(dataset.columns())).contains("metadata")) {
            return dataset;
        }
        return dataset.mapPartitions(iterator -> {
            IntRef create = IntRef.create(0);
            return iterator.map(row -> {
                if (create.elem % i == 0) {
                    Row struct = row.getStruct(row.fieldIndex("metadata"));
                    int fieldIndex = struct.fieldIndex("path");
                    String string = struct.getString(struct.fieldIndex("id"));
                    Seq seq = (Seq) ((SeqLike) struct.getSeq(fieldIndex).map(row -> {
                        return Path$.MODULE$.apply(row);
                    }, Seq$.MODULE$.canBuildFrom())).$plus$colon(new Path(struct.getString(struct.fieldIndex("sourceId")), struct.getLong(struct.fieldIndex("arrivalTimestamp"))), Seq$.MODULE$.canBuildFrom());
                    Seq seq2 = (Seq) seq.takeRight(2);
                    TelemetryMetadataProducer$.MODULE$.send(telemetryMetadataProducerConfig, string, MetricsTelemetryMessageFormat$.MODULE$.metricsTelemetryMessageFormat().write(new MetricsTelemetryMessage(string, ((TraversableOnce) seq.map(path -> {
                        return path.name().replace(' ', '-');
                    }, Seq$.MODULE$.canBuildFrom())).mkString("/"), "latencyMs", ((Path) seq2.apply(1)).ts() - ((Path) seq2.apply(0)).ts(), DateTimeFormatter.ISO_INSTANT.format(Instant.ofEpochMilli(((Path) seq2.apply(1)).ts())))));
                } else {
                    BoxedUnit boxedUnit = BoxedUnit.UNIT;
                }
                create.elem++;
                return row;
            });
        }, RowEncoderUtils$.MODULE$.encoderFor(dataset.schema()));
    }

    public static final /* synthetic */ Metadata $anonfun$updateMetadata$1(String str, String str2, String str3, long j, long j2, Seq seq) {
        long epochMilli = Clock.systemUTC().instant().toEpochMilli();
        return new Metadata(str2, str3, j, epochMilli, (Path[]) ((TraversableOnce) ((Seq) seq.map(row -> {
            return Path$.MODULE$.apply(row);
        }, Seq$.MODULE$.canBuildFrom())).$colon$plus(new Path(str, epochMilli), Seq$.MODULE$.canBuildFrom())).toArray(ClassTag$.MODULE$.apply(Path.class)));
    }

    private MetadataOps$() {
        MODULE$ = this;
    }
}
