package it.agilelab.bigdata.wasp.consumers.spark.streaming;

import akka.actor.Actor;
import akka.actor.ActorContext;
import akka.actor.ActorRef;
import akka.actor.SupervisorStrategy;
import akka.actor.package$;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import it.agilelab.bigdata.wasp.consumers.spark.MlModels.MlModelsDB;
import it.agilelab.bigdata.wasp.consumers.spark.SparkSingletons$;
import it.agilelab.bigdata.wasp.consumers.spark.metadata.Metadata;
import it.agilelab.bigdata.wasp.consumers.spark.metadata.Path;
import it.agilelab.bigdata.wasp.consumers.spark.plugins.WaspConsumersSparkPlugin;
import it.agilelab.bigdata.wasp.consumers.spark.readers.SparkBatchReader;
import it.agilelab.bigdata.wasp.consumers.spark.readers.SparkLegacyStreamingReader;
import it.agilelab.bigdata.wasp.consumers.spark.strategies.ReaderKey;
import it.agilelab.bigdata.wasp.consumers.spark.strategies.Strategy;
import it.agilelab.bigdata.wasp.consumers.spark.utils.MetadataUtils$;
import it.agilelab.bigdata.wasp.consumers.spark.writers.SparkLegacyStreamingWriter;
import it.agilelab.bigdata.wasp.consumers.spark.writers.SparkWriterFactory;
import it.agilelab.bigdata.wasp.core.logging.Logging;
import it.agilelab.bigdata.wasp.core.logging.WaspLogger;
import it.agilelab.bigdata.wasp.datastores.DatastoreProduct;
import it.agilelab.bigdata.wasp.datastores.DatastoreProduct$;
import it.agilelab.bigdata.wasp.datastores.GenericProduct;
import it.agilelab.bigdata.wasp.models.LegacyStreamingETLModel;
import it.agilelab.bigdata.wasp.models.PipegraphModel;
import it.agilelab.bigdata.wasp.models.ReaderModel;
import it.agilelab.bigdata.wasp.models.StrategyModel;
import it.agilelab.bigdata.wasp.models.TopicModel;
import it.agilelab.bigdata.wasp.repository.core.bl.TopicBL;
import java.lang.invoke.MethodHandles;
import java.lang.invoke.MethodType;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.UUID;
import org.apache.spark.broadcast.Broadcast;
import org.apache.spark.rdd.RDD;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.functions$;
import org.apache.spark.streaming.StreamingContext;
import org.apache.spark.streaming.dstream.DStream;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Option$;
import scala.PartialFunction;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.Some;
import scala.Tuple2;
import scala.collection.Iterable;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.TraversableOnce;
import scala.collection.immutable.$colon;
import scala.collection.immutable.List;
import scala.collection.immutable.List$;
import scala.collection.immutable.Map;
import scala.collection.immutable.Nil$;
import scala.reflect.ClassTag$;
import scala.reflect.ScalaSignature;
import scala.reflect.api.Mirror;
import scala.reflect.api.TypeCreator;
import scala.reflect.api.Types;
import scala.reflect.api.Universe;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.ScalaRunTime$;
import scala.runtime.StructuralCallSite;

/* compiled from: LegacyStreamingETLActor.scala */
@ScalaSignature(bytes = "\u0006\u0001\tUb\u0001\u0002\u000b\u0016\u0001\u0011B\u0001b\u000f\u0001\u0003\u0002\u0003\u0006I\u0001\u0010\u0005\tG\u0002\u0011\t\u0011)A\u0005I\"A!\u000e\u0001B\u0001B\u0003%1\u000e\u0003\u0005r\u0001\t\u0005\t\u0015!\u0003s\u0011!a\bA!A!\u0002\u0013i\bBCA\u0004\u0001\t\u0005\t\u0015!\u0003\u0002\n!Q\u0011q\u0002\u0001\u0003\u0002\u0003\u0006I!!\u0005\t\u0015\u0005]\u0001A!A!\u0002\u0013\tI\u0002C\u0004\u0002F\u0001!\t!a\u0012\t\u000f\u0005-\u0004\u0001\"\u0011\u0002n!9\u0011Q\u0010\u0001\u0005B\u0005}\u0004BCAD\u0001!\u0015\r\u0011\"\u0003\u0002\n\"9\u0011Q\u0014\u0001\u0005\n\u0005}\u0005bBAc\u0001\u0011%\u0011q\u0019\u0005\b\u0003\u0017\u0004A\u0011BAg\u0011\u001d\tI\u000e\u0001C\u0005\u0003\u007fBq!a7\u0001\t\u0003\ty\bC\u0004\u0002^\u0002!I!a8\t\u000f\t-\u0001\u0001\"\u0003\u0003\u000e\t9B*Z4bGf\u001cFO]3b[&tw-\u0012+M\u0003\u000e$xN\u001d\u0006\u0003-]\t\u0011b\u001d;sK\u0006l\u0017N\\4\u000b\u0005aI\u0012!B:qCJ\\'B\u0001\u000e\u001c\u0003%\u0019wN\\:v[\u0016\u00148O\u0003\u0002\u001d;\u0005!q/Y:q\u0015\tqr$A\u0004cS\u001e$\u0017\r^1\u000b\u0005\u0001\n\u0013\u0001C1hS2,G.\u00192\u000b\u0003\t\n!!\u001b;\u0004\u0001M!\u0001!J\u00164!\t1\u0013&D\u0001(\u0015\u0005A\u0013!B:dC2\f\u0017B\u0001\u0016(\u0005\u0019\te.\u001f*fMB\u0011A&M\u0007\u0002[)\u0011afL\u0001\u0006C\u000e$xN\u001d\u0006\u0002a\u0005!\u0011m[6b\u0013\t\u0011TFA\u0003BGR|'\u000f\u0005\u00025s5\tQG\u0003\u00027o\u00059An\\4hS:<'B\u0001\u001d\u001c\u0003\u0011\u0019wN]3\n\u0005i*$a\u0002'pO\u001eLgnZ\u0001\u0004K:4(CA\u001f&\r\u0011q\u0004\u0001\u0001\u001f\u0003\u0019q\u0012XMZ5oK6,g\u000e\u001e \t\u000f\u0001k$\u0019!D\u0001\u0003\u00069Ao\u001c9jG\ncU#\u0001\"\u0011\u0005\rKU\"\u0001#\u000b\u0005\u00153\u0015A\u00012m\u0015\tAtI\u0003\u0002I7\u0005Q!/\u001a9pg&$xN]=\n\u0005)#%a\u0002+pa&\u001c'\t\u0014\u0005\b\u0019v\u0012\rQ\"\u0001N\u0003\u001dIg\u000eZ3y\u00052+\u0012A\u0014\t\u0003\u0007>K!\u0001\u0015#\u0003\u000f%sG-\u001a=C\u0019\"9!+\u0010b\u0001\u000e\u0003\u0019\u0016!\u0002:bo\ncU#\u0001+\u0011\u0005\r+\u0016B\u0001,E\u0005\u0015\u0011\u0016m\u001e\"M\u0011\u001dAVH1A\u0007\u0002e\u000b!b[3z-\u0006dW/\u001a\"M+\u0005Q\u0006CA\"\\\u0013\taFI\u0001\u0006LKf4\u0016\r\\;f\u00052CqAX\u001fC\u0002\u001b\u0005q,A\u0005nY6{G-\u001a7C\u0019V\t\u0001\r\u0005\u0002DC&\u0011!\r\u0012\u0002\n\u001b2lu\u000eZ3m\u00052\u000b!c\u001d9be.<&/\u001b;fe\u001a\u000b7\r^8ssB\u0011Q\r[\u0007\u0002M*\u0011qmF\u0001\boJLG/\u001a:t\u0013\tIgM\u0001\nTa\u0006\u00148n\u0016:ji\u0016\u0014h)Y2u_JL\u0018aD:ue\u0016\fW.\u001b8h%\u0016\fG-\u001a:\u0011\u00051|W\"A7\u000b\u00059<\u0012a\u0002:fC\u0012,'o]\u0005\u0003a6\u0014!d\u00159be.dUmZ1dsN#(/Z1nS:<'+Z1eKJ\f1a]:d!\t\u0019(0D\u0001u\u0015\t1RO\u0003\u0002\u0019m*\u0011q\u000f_\u0001\u0007CB\f7\r[3\u000b\u0003e\f1a\u001c:h\u0013\tYHO\u0001\tTiJ,\u0017-\\5oO\u000e{g\u000e^3yi\u0006I\u0001/\u001b9fOJ\f\u0007\u000f\u001b\t\u0004}\u0006\rQ\"A@\u000b\u0007\u0005\u00051$\u0001\u0004n_\u0012,Gn]\u0005\u0004\u0003\u000by(A\u0004)ja\u0016<'/\u00199i\u001b>$W\r\\\u0001\u0013Y\u0016<\u0017mY=TiJ,\u0017-\\5oO\u0016#F\nE\u0002\u007f\u0003\u0017I1!!\u0004��\u0005]aUmZ1dsN#(/Z1nS:<W\t\u0016'N_\u0012,G.\u0001\u0005mSN$XM\\3s!\ra\u00131C\u0005\u0004\u0003+i#\u0001C!di>\u0014(+\u001a4\u0002\u000fAdWoZ5ogBA\u00111DA\u0015\u0003_\tYD\u0004\u0003\u0002\u001e\u0005\u0015\u0002cAA\u0010O5\u0011\u0011\u0011\u0005\u0006\u0004\u0003G\u0019\u0013A\u0002\u001fs_>$h(C\u0002\u0002(\u001d\na\u0001\u0015:fI\u00164\u0017\u0002BA\u0016\u0003[\u00111!T1q\u0015\r\t9c\n\t\u0005\u0003c\t9$\u0004\u0002\u00024)\u0019\u0011QG\u000e\u0002\u0015\u0011\fG/Y:u_J,7/\u0003\u0003\u0002:\u0005M\"\u0001\u0005#bi\u0006\u001cHo\u001c:f!J|G-^2u!\u0011\ti$!\u0011\u000e\u0005\u0005}\"bAA\f/%!\u00111IA \u0005a9\u0016m\u001d9D_:\u001cX/\\3sgN\u0003\u0018M]6QYV<\u0017N\\\u0001\u0007y%t\u0017\u000e\u001e \u0015%\u0005%\u0013QJA/\u0003?\n\t'a\u0019\u0002f\u0005\u001d\u0014\u0011\u000e\t\u0004\u0003\u0017\u0002Q\"A\u000b\t\rmJ\u0001\u0019AA(%\r\t\t&\n\u0004\u0006}\u0001\u0001\u0011q\n\u0005\t\u0001\u0006E#\u0019!D\u0001\u0003\"AA*!\u0015C\u0002\u001b\u0005Q\n\u0003\u0005S\u0003#\u0012\rQ\"\u0001T\u0011!A\u0016\u0011\u000bb\u0001\u000e\u0003I\u0006\u0002\u00030\u0002R\t\u0007i\u0011A0\t\u000b\rL\u0001\u0019\u00013\t\u000b)L\u0001\u0019A6\t\u000bEL\u0001\u0019\u0001:\t\u000bqL\u0001\u0019A?\t\u000f\u0005\u001d\u0011\u00021\u0001\u0002\n!9\u0011qB\u0005A\u0002\u0005E\u0001bBA\f\u0013\u0001\u0007\u0011\u0011D\u0001\be\u0016\u001cW-\u001b<f+\t\ty\u0007\u0005\u0003\u0002r\u0005]db\u0001\u0017\u0002t%\u0019\u0011QO\u0017\u0002\u000b\u0005\u001bGo\u001c:\n\t\u0005e\u00141\u0010\u0002\b%\u0016\u001cW-\u001b<f\u0015\r\t)(L\u0001\taJ,7\u000b^1siR\u0011\u0011\u0011\u0011\t\u0004M\u0005\r\u0015bAACO\t!QK\\5u\u00039\u0019'/Z1uKN#(/\u0019;fOf,\"!a#\u0011\u000b\u0019\ni)!%\n\u0007\u0005=uE\u0001\u0004PaRLwN\u001c\t\u0005\u0003'\u000bI*\u0004\u0002\u0002\u0016*\u0019\u0011qS\f\u0002\u0015M$(/\u0019;fO&,7/\u0003\u0003\u0002\u001c\u0006U%\u0001C*ue\u0006$XmZ=\u0002!\u0005dGn\u0015;bi&\u001c'+Z1eKJ\u001cH\u0003BAQ\u0003s\u0003b!a)\u0002.\u0006Mf\u0002BAS\u0003SsA!a\b\u0002(&\t\u0001&C\u0002\u0002,\u001e\nq\u0001]1dW\u0006<W-\u0003\u0003\u00020\u0006E&\u0001\u0002'jgRT1!a+(!\ra\u0017QW\u0005\u0004\u0003ok'\u0001E*qCJ\\')\u0019;dQJ+\u0017\rZ3s\u0011\u001d\tY,\u0004a\u0001\u0003{\u000b!c\u001d;bi&\u001c'+Z1eKJlu\u000eZ3mgB1\u00111UAW\u0003\u007f\u00032A`Aa\u0013\r\t\u0019m \u0002\f%\u0016\fG-\u001a:N_\u0012,G.A\u000bsKR\u0014\u0018.\u001a<f'R\fG/[2SK\u0006$WM]:\u0015\t\u0005\u0005\u0016\u0011\u001a\u0005\b\u0003ws\u0001\u0019AA_\u0003-!x\u000e]5d\u001b>$W\r\\:\u0015\u0005\u0005=\u0007CBAR\u0003[\u000b\t\u000eE\u0003'\u0003\u001b\u000b\u0019\u000eE\u0002\u007f\u0003+L1!a6��\u0005)!v\u000e]5d\u001b>$W\r\\\u0001\u000fm\u0006d\u0017\u000eZ1uS>tG+Y:l\u0003!i\u0017-\u001b8UCN\\\u0017a\u0003:fiJLWM^3E\rN$B!!9\u0003\nAA\u00111DA\u0015\u0003G\fI\u000f\u0005\u0003\u0002\u0014\u0006\u0015\u0018\u0002BAt\u0003+\u0013\u0011BU3bI\u0016\u00148*Z=\u0011\t\u0005-(1\u0001\b\u0005\u0003[\fyP\u0004\u0003\u0002p\u0006mh\u0002BAy\u0003stA!a=\u0002x:!\u0011qDA{\u0013\u0005I\u0018BA<y\u0013\tAb/C\u0002\u0002~V\f1a]9m\u0013\u0011\tYK!\u0001\u000b\u0007\u0005uX/\u0003\u0003\u0003\u0006\t\u001d!!\u0003#bi\u00064%/Y7f\u0015\u0011\tYK!\u0001\t\u000f\u0005m&\u00031\u0001\u0002>\u0006IAO]1og\u001a|'/\u001c\u000b\r\u0005\u001f\u0011\tC!\n\u0003*\t5\"\u0011\u0007\t\u0007\u0005#\u00119Ba\u0007\u000e\u0005\tM!b\u0001B\u000bi\u00069Am\u001d;sK\u0006l\u0017\u0002\u0002B\r\u0005'\u0011q\u0001R*ue\u0016\fW\u000e\u0005\u0003\u0002\u001c\tu\u0011\u0002\u0002B\u0010\u0003[\u0011aa\u0015;sS:<\u0007b\u0002B\u0012'\u0001\u0007\u00111]\u0001\ne\u0016\fG-\u001a:LKfDqAa\n\u0014\u0001\u0004\u0011y!\u0001\u0004tiJ,\u0017-\u001c\u0005\b\u0005W\u0019\u0002\u0019AAq\u00031!\u0017\r^1Ti>\u0014X\r\u0012$t\u0011\u001d\u0011yc\u0005a\u0001\u0003#\u000b\u0001b\u001d;sCR,w-\u001f\u0005\b\u0005g\u0019\u0002\u0019AA\u0018\u0003A!\u0017\r^1ti>\u0014X\r\u0015:pIV\u001cG\u000f")
/* loaded from: input_file:it/agilelab/bigdata/wasp/consumers/spark/streaming/LegacyStreamingETLActor.class */
public class LegacyStreamingETLActor implements Actor, Logging {
    private Option<Strategy> createStrategy;
    private final Object env;
    private final SparkWriterFactory sparkWriterFactory;
    private final SparkLegacyStreamingReader streamingReader;
    private final StreamingContext ssc;
    private final PipegraphModel pipegraph;
    private final LegacyStreamingETLModel legacyStreamingETL;
    private final ActorRef listener;
    private final Map<DatastoreProduct, WaspConsumersSparkPlugin> plugins;
    private final WaspLogger logger;
    private final ActorContext context;
    private final ActorRef self;
    private volatile boolean bitmap$0;

    public static Method reflMethod$Method1(Class cls) {
        StructuralCallSite apply = (StructuralCallSite) StructuralCallSite.bootstrap(MethodHandles.lookup(), "apply", MethodType.methodType(StructuralCallSite.class), MethodType.methodType(Object.class)).dynamicInvoker().invoke() /* invoke-custom */;
        Method find = apply.find(cls);
        if (find != null) {
            return find;
        }
        Method ensureAccessible = ScalaRunTime$.MODULE$.ensureAccessible(cls.getMethod("topicBL", apply.parameterTypes()));
        apply.add(cls, ensureAccessible);
        return ensureAccessible;
    }

    public final ActorRef sender() {
        return Actor.sender$(this);
    }

    public void aroundReceive(PartialFunction<Object, BoxedUnit> partialFunction, Object obj) {
        Actor.aroundReceive$(this, partialFunction, obj);
    }

    public void aroundPreStart() {
        Actor.aroundPreStart$(this);
    }

    public void aroundPostStop() {
        Actor.aroundPostStop$(this);
    }

    public void aroundPreRestart(Throwable th, Option<Object> option) {
        Actor.aroundPreRestart$(this, th, option);
    }

    public void aroundPostRestart(Throwable th) {
        Actor.aroundPostRestart$(this, th);
    }

    public SupervisorStrategy supervisorStrategy() {
        return Actor.supervisorStrategy$(this);
    }

    public void postStop() throws Exception {
        Actor.postStop$(this);
    }

    public void preRestart(Throwable th, Option<Object> option) throws Exception {
        Actor.preRestart$(this, th, option);
    }

    public void postRestart(Throwable th) throws Exception {
        Actor.postRestart$(this, th);
    }

    public void unhandled(Object obj) {
        Actor.unhandled$(this, obj);
    }

    public WaspLogger logger() {
        return this.logger;
    }

    public void it$agilelab$bigdata$wasp$core$logging$Logging$_setter_$logger_$eq(WaspLogger waspLogger) {
        this.logger = waspLogger;
    }

    public ActorContext context() {
        return this.context;
    }

    public final ActorRef self() {
        return this.self;
    }

    public void akka$actor$Actor$_setter_$context_$eq(ActorContext actorContext) {
        this.context = actorContext;
    }

    public final void akka$actor$Actor$_setter_$self_$eq(ActorRef actorRef) {
        this.self = actorRef;
    }

    public PartialFunction<Object, BoxedUnit> receive() {
        return new LegacyStreamingETLActor$$anonfun$receive$1(this);
    }

    public void preStart() {
        logger().info(() -> {
            return "Actor is transitioning from 'uninitialized' to 'initialized'";
        });
        try {
            validationTask();
            mainTask();
        } catch (Error e) {
            String sb = new StringBuilder(50).append("Pipegraph '").append(this.pipegraph.name()).append("' - LegacyStreamingETLActor '").append(this.legacyStreamingETL.name()).append("': Error: ").append(e.getMessage()).toString();
            logger().error(() -> {
                return sb;
            }, e);
            package$.MODULE$.actorRef2Scala(this.listener).$bang(scala.package$.MODULE$.Left().apply(sb), self());
        } catch (Exception e2) {
            String sb2 = new StringBuilder(54).append("Pipegraph '").append(this.pipegraph.name()).append("' - LegacyStreamingETLActor '").append(this.legacyStreamingETL.name()).append("': Exception: ").append(e2.getMessage()).toString();
            logger().error(() -> {
                return sb2;
            }, e2);
            package$.MODULE$.actorRef2Scala(this.listener).$bang(scala.package$.MODULE$.Left().apply(sb2), self());
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    private Option<Strategy> createStrategy$lzycompute() {
        Config config;
        None$ some;
        synchronized (this) {
            if (!this.bitmap$0) {
                Some strategy = this.legacyStreamingETL.strategy();
                if (None$.MODULE$.equals(strategy)) {
                    some = None$.MODULE$;
                } else {
                    if (!(strategy instanceof Some)) {
                        throw new MatchError(strategy);
                    }
                    StrategyModel strategyModel = (StrategyModel) strategy.value();
                    Strategy strategy2 = (Strategy) Class.forName(strategyModel.className()).newInstance();
                    Some configurationConfig = strategyModel.configurationConfig();
                    if (None$.MODULE$.equals(configurationConfig)) {
                        config = ConfigFactory.empty();
                    } else {
                        if (!(configurationConfig instanceof Some)) {
                            throw new MatchError(configurationConfig);
                        }
                        config = (Config) configurationConfig.value();
                    }
                    strategy2.configuration_$eq(config);
                    logger().info(() -> {
                        return new StringBuilder(10).append("strategy: ").append(strategy2).toString();
                    });
                    some = new Some(strategy2);
                }
                this.createStrategy = some;
                this.bitmap$0 = true;
            }
        }
        return this.createStrategy;
    }

    private Option<Strategy> createStrategy() {
        return !this.bitmap$0 ? createStrategy$lzycompute() : this.createStrategy;
    }

    private List<SparkBatchReader> allStaticReaders(List<ReaderModel> list) {
        return (List) list.flatMap(readerModel -> {
            Iterable option2Iterable;
            DatastoreProduct datastoreProduct = readerModel.datastoreProduct();
            this.logger().info(() -> {
                return new StringBuilder(44).append("Finding reader plugin for datastore product ").append(datastoreProduct).toString();
            });
            Option option = this.plugins.get(datastoreProduct);
            if (option.isDefined()) {
                option2Iterable = Option$.MODULE$.option2Iterable(new Some(((WaspConsumersSparkPlugin) option.get()).getSparkBatchReader(this.ssc.sparkContext(), readerModel)));
            } else {
                this.logger().error(() -> {
                    return new StringBuilder(39).append("No plugin found for datastore product ").append(datastoreProduct).append("!").toString();
                });
                option2Iterable = Option$.MODULE$.option2Iterable(None$.MODULE$);
            }
            return option2Iterable;
        }, List$.MODULE$.canBuildFrom());
    }

    private List<SparkBatchReader> retrieveStaticReaders(List<ReaderModel> list) {
        return allStaticReaders(list);
    }

    private List<Option<TopicModel>> topicModels() {
        return (List) this.legacyStreamingETL.inputs().flatMap(readerModel -> {
            Iterable option2Iterable;
            if (readerModel != null) {
                String datastoreModelName = readerModel.datastoreModelName();
                DatastoreProduct datastoreProduct = readerModel.datastoreProduct();
                GenericProduct KafkaProduct = DatastoreProduct$.MODULE$.KafkaProduct();
                if (KafkaProduct != null ? KafkaProduct.equals(datastoreProduct) : datastoreProduct == null) {
                    Object obj = this.env;
                    try {
                        option2Iterable = Option$.MODULE$.option2Iterable(new Some(((TopicBL) reflMethod$Method1(obj.getClass()).invoke(obj, new Object[0])).getTopicModelByName(datastoreModelName)));
                        return option2Iterable;
                    } catch (InvocationTargetException e) {
                        throw e.getCause();
                    }
                }
            }
            option2Iterable = Option$.MODULE$.option2Iterable(None$.MODULE$);
            return option2Iterable;
        }, List$.MODULE$.canBuildFrom());
    }

    private void validationTask() {
        int count = this.legacyStreamingETL.inputs().count(readerModel -> {
            return BoxesRunTime.boxToBoolean($anonfun$validationTask$1(readerModel));
        });
        if (count == 0) {
            throw new Exception(new StringBuilder(40).append("There is NO topic to read data, inputs: ").append(this.legacyStreamingETL.inputs()).toString());
        }
        if (count != 1) {
            throw new Exception(new StringBuilder(32).append("MUST be only ONE topic, inputs: ").append(this.legacyStreamingETL.inputs()).toString());
        }
    }

    public void mainTask() {
        DStream<String> dStream;
        List list = (List) topicModels().map(option -> {
            Predef$.MODULE$.assert(option.isDefined());
            TopicModel topicModel = (TopicModel) option.get();
            return new Tuple2(new ReaderKey(DatastoreProduct$.MODULE$.GenericTopicProduct().categoryName(), topicModel.name()), this.streamingReader.createStream(this.legacyStreamingETL.group(), this.legacyStreamingETL.kafkaAccessType(), topicModel, this.ssc));
        }, List$.MODULE$.canBuildFrom());
        Predef$.MODULE$.assert(list.nonEmpty());
        Predef$.MODULE$.assert(list.size() == 1);
        Tuple2 tuple2 = (Tuple2) list.head();
        if (createStrategy().isDefined()) {
            Strategy strategy = (Strategy) createStrategy().get();
            List<ReaderModel> list2 = (List) this.legacyStreamingETL.inputs().filterNot(readerModel -> {
                return BoxesRunTime.boxToBoolean($anonfun$mainTask$2(readerModel));
            });
            Map<ReaderKey, Dataset<Row>> empty = list2.isEmpty() ? Predef$.MODULE$.Map().empty() : retrieveDFs(list2);
            int size = list2.size();
            int size2 = empty.size();
            if (size2 != size) {
                String sb = new StringBuilder(32).append("DFs not retrieved successfully!\n").append(new StringBuilder(32).append(size).append(" DFs required - ").append(size2).append(" DFs retrieved!\n").toString()).append(empty.toString()).toString();
                logger().error(() -> {
                    return sb;
                });
                throw new Exception(new StringBuilder(62).append("DFs not retrieved successful - ").append(size).append(" DFs required - ").append(size2).append(" DFs retrieved!").toString());
            }
            if (!empty.isEmpty()) {
                logger().info(() -> {
                    return "DFs retrieved successfully!";
                });
            }
            strategy.mlModelsBroadcast_$eq(new MlModelsDB(this.env).createModelsBroadcast(this.legacyStreamingETL.mlModels(), this.ssc.sparkContext()));
            dStream = transform((ReaderKey) tuple2._1(), (DStream) tuple2._2(), empty, strategy, this.legacyStreamingETL.output().datastoreProduct());
        } else {
            dStream = (DStream) tuple2._2();
        }
        DStream<String> dStream2 = dStream;
        Some createSparkWriterLegacyStreaming = this.sparkWriterFactory.createSparkWriterLegacyStreaming(this.env, this.ssc, this.legacyStreamingETL, this.legacyStreamingETL.output());
        if (!(createSparkWriterLegacyStreaming instanceof Some)) {
            if (!None$.MODULE$.equals(createSparkWriterLegacyStreaming)) {
                throw new MatchError(createSparkWriterLegacyStreaming);
            }
            throw new Exception(new StringBuilder(47).append("No Spark Streaming writer available for writer ").append(this.legacyStreamingETL.output()).toString());
        }
        ((SparkLegacyStreamingWriter) createSparkWriterLegacyStreaming.value()).write(dStream2);
        BoxedUnit boxedUnit = BoxedUnit.UNIT;
        package$.MODULE$.actorRef2Scala(this.listener).$bang(scala.package$.MODULE$.Right().apply(new StringBuilder(49).append("Pipegraph '").append(this.pipegraph.name()).append("' - LegacyStreamingETLActor '").append(this.legacyStreamingETL.name()).append("' started").toString()), self());
    }

    private Map<ReaderKey, Dataset<Row>> retrieveDFs(List<ReaderModel> list) {
        return ((TraversableOnce) retrieveStaticReaders(list).flatMap(sparkBatchReader -> {
            try {
                return Option$.MODULE$.option2Iterable(new Some(new Tuple2(new ReaderKey(sparkBatchReader.readerType(), sparkBatchReader.name()), sparkBatchReader.read(this.ssc.sparkContext()))));
            } catch (Exception e) {
                this.logger().error(() -> {
                    return new StringBuilder(28).append("Error during retrieving DF: ").append(sparkBatchReader.name()).toString();
                }, e);
                return Option$.MODULE$.option2Iterable(None$.MODULE$);
            }
        }, List$.MODULE$.canBuildFrom())).toMap(Predef$.MODULE$.$conforms());
    }

    private DStream<String> transform(ReaderKey readerKey, DStream<String> dStream, Map<ReaderKey, Dataset<Row>> map, Strategy strategy, DatastoreProduct datastoreProduct) {
        SQLContext sQLContext = SparkSingletons$.MODULE$.getSQLContext();
        Broadcast broadcast = this.ssc.sparkContext().broadcast(strategy, ClassTag$.MODULE$.apply(Strategy.class));
        String name = this.legacyStreamingETL.name();
        logger().debug(() -> {
            return new StringBuilder(14).append("input stream: ").append(readerKey.name()).toString();
        });
        return dStream.transform(rdd -> {
            RDD rdd;
            Dataset json = sQLContext.read().json(rdd);
            if (!json.schema().nonEmpty()) {
                return rdd;
            }
            final LegacyStreamingETLActor legacyStreamingETLActor = null;
            final LegacyStreamingETLActor legacyStreamingETLActor2 = null;
            final LegacyStreamingETLActor legacyStreamingETLActor3 = null;
            final LegacyStreamingETLActor legacyStreamingETLActor4 = null;
            Dataset<Row> transform = ((Strategy) broadcast.value()).transform(map.$plus(Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(readerKey), json.withColumn("metadata", functions$.MODULE$.udf((str, str2, obj, obj2, seq) -> {
                return $anonfun$transform$3(name, str, str2, BoxesRunTime.unboxToLong(obj), BoxesRunTime.unboxToLong(obj2), seq);
            }, scala.reflect.runtime.package$.MODULE$.universe().TypeTag().apply(scala.reflect.runtime.package$.MODULE$.universe().runtimeMirror(LegacyStreamingETLActor.class.getClassLoader()), new TypeCreator(legacyStreamingETLActor) { // from class: it.agilelab.bigdata.wasp.consumers.spark.streaming.LegacyStreamingETLActor$$typecreator1$1
                public <U extends Universe> Types.TypeApi apply(Mirror<U> mirror) {
                    mirror.universe();
                    return mirror.staticClass("it.agilelab.bigdata.wasp.consumers.spark.metadata.Metadata").asType().toTypeConstructor();
                }
            }), scala.reflect.runtime.package$.MODULE$.universe().TypeTag().apply(scala.reflect.runtime.package$.MODULE$.universe().runtimeMirror(LegacyStreamingETLActor.class.getClassLoader()), new TypeCreator(legacyStreamingETLActor2) { // from class: it.agilelab.bigdata.wasp.consumers.spark.streaming.LegacyStreamingETLActor$$typecreator2$1
                public <U extends Universe> Types.TypeApi apply(Mirror<U> mirror) {
                    Universe universe = mirror.universe();
                    return universe.internal().reificationSupport().TypeRef(universe.internal().reificationSupport().SingleType(mirror.staticPackage("scala").asModule().moduleClass().asType().toTypeConstructor(), mirror.staticModule("scala.Predef")), universe.internal().reificationSupport().selectType(mirror.staticModule("scala.Predef").asModule().moduleClass(), "String"), Nil$.MODULE$);
                }
            }), scala.reflect.runtime.package$.MODULE$.universe().TypeTag().apply(scala.reflect.runtime.package$.MODULE$.universe().runtimeMirror(LegacyStreamingETLActor.class.getClassLoader()), new TypeCreator(legacyStreamingETLActor3) { // from class: it.agilelab.bigdata.wasp.consumers.spark.streaming.LegacyStreamingETLActor$$typecreator3$1
                public <U extends Universe> Types.TypeApi apply(Mirror<U> mirror) {
                    Universe universe = mirror.universe();
                    return universe.internal().reificationSupport().TypeRef(universe.internal().reificationSupport().SingleType(mirror.staticPackage("scala").asModule().moduleClass().asType().toTypeConstructor(), mirror.staticModule("scala.Predef")), universe.internal().reificationSupport().selectType(mirror.staticModule("scala.Predef").asModule().moduleClass(), "String"), Nil$.MODULE$);
                }
            }), scala.reflect.runtime.package$.MODULE$.universe().TypeTag().Long(), scala.reflect.runtime.package$.MODULE$.universe().TypeTag().Long(), scala.reflect.runtime.package$.MODULE$.universe().TypeTag().apply(scala.reflect.runtime.package$.MODULE$.universe().runtimeMirror(LegacyStreamingETLActor.class.getClassLoader()), new TypeCreator(legacyStreamingETLActor4) { // from class: it.agilelab.bigdata.wasp.consumers.spark.streaming.LegacyStreamingETLActor$$typecreator4$1
                public <U extends Universe> Types.TypeApi apply(Mirror<U> mirror) {
                    Universe universe = mirror.universe();
                    return universe.internal().reificationSupport().TypeRef(universe.internal().reificationSupport().SingleType(universe.internal().reificationSupport().SingleType(universe.internal().reificationSupport().thisPrefix(mirror.RootClass()), mirror.staticPackage("scala")), mirror.staticModule("scala.package")), universe.internal().reificationSupport().selectType(mirror.staticModule("scala.package").asModule().moduleClass(), "Seq"), new $colon.colon(mirror.staticClass("it.agilelab.bigdata.wasp.consumers.spark.metadata.Path").asType().toTypeConstructor(), Nil$.MODULE$));
                }
            })).apply(Predef$.MODULE$.wrapRefArray(new Column[]{functions$.MODULE$.col("metadata.id"), functions$.MODULE$.col("metadata.sourceId"), functions$.MODULE$.col("metadata.arrivalTimestamp"), functions$.MODULE$.col("metadata.lastSeenTimestamp"), functions$.MODULE$.col("metadata.path")}))))));
            GenericProduct KafkaProduct = DatastoreProduct$.MODULE$.KafkaProduct();
            if (KafkaProduct != null ? !KafkaProduct.equals(datastoreProduct) : datastoreProduct != null) {
                GenericProduct HBaseProduct = DatastoreProduct$.MODULE$.HBaseProduct();
                if (HBaseProduct != null ? !HBaseProduct.equals(datastoreProduct) : datastoreProduct != null) {
                    GenericProduct RawProduct = DatastoreProduct$.MODULE$.RawProduct();
                    if (RawProduct != null ? !RawProduct.equals(datastoreProduct) : datastoreProduct != null) {
                        GenericProduct ConsoleProduct = DatastoreProduct$.MODULE$.ConsoleProduct();
                        rdd = (ConsoleProduct != null ? !ConsoleProduct.equals(datastoreProduct) : datastoreProduct != null) ? transform.select(Predef$.MODULE$.wrapRefArray(MetadataUtils$.MODULE$.flatMetadataSchema(json.schema(), None$.MODULE$))).toJSON().rdd() : transform.toJSON().rdd();
                    } else {
                        rdd = transform.toJSON().rdd();
                    }
                } else {
                    rdd = transform.toJSON().rdd();
                }
            } else {
                rdd = transform.toJSON().rdd();
            }
            return rdd;
        }, ClassTag$.MODULE$.apply(String.class));
    }

    public static final /* synthetic */ boolean $anonfun$validationTask$1(ReaderModel readerModel) {
        String categoryName = readerModel.datastoreProduct().categoryName();
        String categoryName2 = DatastoreProduct$.MODULE$.GenericTopicProduct().categoryName();
        return categoryName != null ? categoryName.equals(categoryName2) : categoryName2 == null;
    }

    public static final /* synthetic */ boolean $anonfun$mainTask$2(ReaderModel readerModel) {
        String categoryName = readerModel.datastoreProduct().categoryName();
        String categoryName2 = DatastoreProduct$.MODULE$.GenericTopicProduct().categoryName();
        return categoryName != null ? categoryName.equals(categoryName2) : categoryName2 == null;
    }

    public static final /* synthetic */ Metadata $anonfun$transform$3(String str, String str2, String str3, long j, long j2, Seq seq) {
        long currentTimeMillis = System.currentTimeMillis();
        return (str2 != null ? !str2.equals("") : "" != 0) ? new Metadata(str2, str3, j, currentTimeMillis, (Path[]) ((TraversableOnce) seq.$colon$plus(new Path(str, currentTimeMillis), Seq$.MODULE$.canBuildFrom())).toArray(ClassTag$.MODULE$.apply(Path.class))) : new Metadata(UUID.randomUUID().toString(), str3, currentTimeMillis, currentTimeMillis, (Path[]) Seq$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Path[]{new Path(str, currentTimeMillis)})).toArray(ClassTag$.MODULE$.apply(Path.class)));
    }

    public LegacyStreamingETLActor(Object obj, SparkWriterFactory sparkWriterFactory, SparkLegacyStreamingReader sparkLegacyStreamingReader, StreamingContext streamingContext, PipegraphModel pipegraphModel, LegacyStreamingETLModel legacyStreamingETLModel, ActorRef actorRef, Map<DatastoreProduct, WaspConsumersSparkPlugin> map) {
        this.env = obj;
        this.sparkWriterFactory = sparkWriterFactory;
        this.streamingReader = sparkLegacyStreamingReader;
        this.ssc = streamingContext;
        this.pipegraph = pipegraphModel;
        this.legacyStreamingETL = legacyStreamingETLModel;
        this.listener = actorRef;
        this.plugins = map;
        Actor.$init$(this);
        Logging.$init$(this);
    }
}
