package it.agilelab.bigdata.wasp.consumers.spark.streaming.actor.etl;

import it.agilelab.bigdata.wasp.consumers.spark.utils.SparkUtils$;
import it.agilelab.bigdata.wasp.consumers.spark.writers.SparkStructuredStreamingWriter;
import it.agilelab.bigdata.wasp.core.consumers.BaseConsumersMasterGuadian$;
import it.agilelab.bigdata.wasp.core.utils.ConfigManager$;
import it.agilelab.bigdata.wasp.models.PipegraphModel;
import it.agilelab.bigdata.wasp.models.StructuredStreamingETLModel;
import it.agilelab.bigdata.wasp.models.WriterModel;
import it.agilelab.bigdata.wasp.models.configuration.SparkStreamingConfigModel;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.streaming.DataStreamWriter;
import org.apache.spark.sql.streaming.StreamingQuery;
import org.apache.spark.sql.streaming.Trigger;
import scala.Function3;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Some;
import scala.reflect.ScalaSignature;
import scala.util.Failure;
import scala.util.Success;
import scala.util.Try;
import scala.util.Try$;

/* compiled from: MaterializationSteps.scala */
@ScalaSignature(bytes = "\u0006\u0001\u0005]eaB\b\u0011!\u0003\r\ta\t\u0005\u0006U\u0001!\ta\u000b\u0005\b_\u0001\u0011\rQ\"\u00051\u0011\u001dy\u0007A1A\u0007\u0012ADQ!\u001d\u0001\u0005\u0012IDq!a\u000b\u0001\t\u0013\ti\u0003C\u0004\u0002V\u0001!I!a\u0016\t\u000f\u0005m\u0003\u0001\"\u0003\u0002^!9\u00111\r\u0001\u0005\n\u0005\u0015\u0004bBA>\u0001\u0011%\u0011Q\u0010\u0005\b\u0003\u0017\u0003A\u0011BAI\u000f\u0015A\u0005\u0003#\u0001J\r\u0015y\u0001\u0003#\u0001L\u0011\u0015aE\u0002\"\u0001N\u000b\u0011qE\u0002A(\u0003)5\u000bG/\u001a:jC2L'0\u0019;j_:\u001cF/\u001a9t\u0015\t\t\"#A\u0002fi2T!a\u0005\u000b\u0002\u000b\u0005\u001cGo\u001c:\u000b\u0005U1\u0012!C:ue\u0016\fW.\u001b8h\u0015\t9\u0002$A\u0003ta\u0006\u00148N\u0003\u0002\u001a5\u0005I1m\u001c8tk6,'o\u001d\u0006\u00037q\tAa^1ta*\u0011QDH\u0001\bE&<G-\u0019;b\u0015\ty\u0002%\u0001\u0005bO&dW\r\\1c\u0015\u0005\t\u0013AA5u\u0007\u0001\u0019\"\u0001\u0001\u0013\u0011\u0005\u0015BS\"\u0001\u0014\u000b\u0003\u001d\nQa]2bY\u0006L!!\u000b\u0014\u0003\r\u0005s\u0017PU3g\u0003\u0019!\u0013N\\5uIQ\tA\u0006\u0005\u0002&[%\u0011aF\n\u0002\u0005+:LG/A\u0007xe&$XM\u001d$bGR|'/_\u000b\u0002cA\u0011!G\u0004\b\u0003g-q!\u0001N$\u000f\u0005U2eB\u0001\u001cF\u001d\t9DI\u0004\u00029\u0007:\u0011\u0011H\u0011\b\u0003u\u0005s!a\u000f!\u000f\u0005qzT\"A\u001f\u000b\u0005y\u0012\u0013A\u0002\u001fs_>$h(C\u0001\"\u0013\ty\u0002%\u0003\u0002\u001e=%\u00111\u0004H\u0005\u00033iI!a\u0006\r\n\u0005U1\u0012BA\n\u0015\u0013\t\t\"#\u0001\u000bNCR,'/[1mSj\fG/[8o'R,\u0007o\u001d\t\u0003\u00152i\u0011\u0001E\n\u0003\u0019\u0011\na\u0001P5oSRtD#A%\u0003\u001b]\u0013\u0018\u000e^3s\r\u0006\u001cGo\u001c:z!\u0019)\u0003K\u0015-\\M&\u0011\u0011K\n\u0002\n\rVt7\r^5p]N\u0002\"a\u0015,\u000e\u0003QS!!\u0016\u000e\u0002\r5|G-\u001a7t\u0013\t9FKA\u000eTiJ,8\r^;sK\u0012\u001cFO]3b[&tw-\u0012+M\u001b>$W\r\u001c\t\u0003'fK!A\u0017+\u0003\u0017]\u0013\u0018\u000e^3s\u001b>$W\r\u001c\t\u00039\u0012l\u0011!\u0018\u0006\u0003=~\u000b1a]9m\u0015\t9\u0002M\u0003\u0002bE\u00061\u0011\r]1dQ\u0016T\u0011aY\u0001\u0004_J<\u0017BA3^\u00051\u0019\u0006/\u0019:l'\u0016\u001c8/[8o!\r)s-[\u0005\u0003Q\u001a\u0012aa\u00149uS>t\u0007C\u00016n\u001b\u0005Y'B\u00017\u0017\u0003\u001d9(/\u001b;feNL!A\\6\u0003=M\u0003\u0018M]6TiJ,8\r^;sK\u0012\u001cFO]3b[&twm\u0016:ji\u0016\u0014\u0018\u0001D:qCJ\\7+Z:tS>tW#A.\u0002\u00175\fG/\u001a:jC2L'0\u001a\u000b\u0006gz|\u0018\u0011\u0002\t\u0004i^LX\"A;\u000b\u0005Y4\u0013\u0001B;uS2L!\u0001_;\u0003\u0007Q\u0013\u0018\u0010\u0005\u0002{y6\t1P\u0003\u0002\u0016;&\u0011Qp\u001f\u0002\u000f'R\u0014X-Y7j]\u001e\fV/\u001a:z\u0011\u0015\tB\u00011\u0001S\u0011\u001d\t\t\u0001\u0002a\u0001\u0003\u0007\t\u0011\u0002]5qK\u001e\u0014\u0018\r\u001d5\u0011\u0007M\u000b)!C\u0002\u0002\bQ\u0013a\u0002U5qK\u001e\u0014\u0018\r\u001d5N_\u0012,G\u000eC\u0004\u0002\f\u0011\u0001\r!!\u0004\u0002\u0013\u0011\fG/\u0019$sC6,\u0007\u0003BA\b\u0003KqA!!\u0005\u0002\"9!\u00111CA\u0010\u001d\u0011\t)\"!\b\u000f\t\u0005]\u00111\u0004\b\u0004y\u0005e\u0011\"A2\n\u0005\u0005\u0014\u0017BA\fa\u0013\tqv,C\u0002\u0002$u\u000bq\u0001]1dW\u0006<W-\u0003\u0003\u0002(\u0005%\"!\u0003#bi\u00064%/Y7f\u0015\r\t\u0019#X\u0001\u0016O\u0016tWM]1uK\u000eCWmY6Q_&tG\u000fR5s)!\ty#!\u0011\u0002D\u0005M\u0003\u0003\u0002;x\u0003c\u0001B!a\r\u0002<9!\u0011QGA\u001c!\tad%C\u0002\u0002:\u0019\na\u0001\u0015:fI\u00164\u0017\u0002BA\u001f\u0003\u007f\u0011aa\u0015;sS:<'bAA\u001dM!)\u0011#\u0002a\u0001%\"9\u0011QI\u0003A\u0002\u0005\u001d\u0013\u0001F:qCJ\\7\u000b\u001e:fC6LgnZ\"p]\u001aLw\r\u0005\u0003\u0002J\u0005=SBAA&\u0015\r\ti\u0005V\u0001\u000eG>tg-[4ve\u0006$\u0018n\u001c8\n\t\u0005E\u00131\n\u0002\u001a'B\f'o[*ue\u0016\fW.\u001b8h\u0007>tg-[4N_\u0012,G\u000eC\u0004\u0002\u0002\u0015\u0001\r!a\u0001\u00029I,GO]5fm\u0016\u001c\u0006/\u0019:l'R\u0014X-Y7j]\u001e\u001cuN\u001c4jOV\u0011\u0011\u0011\f\t\u0005i^\f9%\u0001\u0007de\u0016\fG/Z,sSR,'\u000f\u0006\u0003\u0002`\u0005\u0005\u0004c\u0001;xS\")\u0011c\u0002a\u0001%\u0006)qO]5uKR1\u0011qMA;\u0003s\u0002B\u0001^<\u0002jA)!0a\u001b\u0002p%\u0019\u0011QN>\u0003!\u0011\u000bG/Y*ue\u0016\fWn\u0016:ji\u0016\u0014\bc\u0001/\u0002r%\u0019\u00111O/\u0003\u0007I{w\u000f\u0003\u0004\u0002x!\u0001\r![\u0001\u0007oJLG/\u001a:\t\u000f\u0005-\u0001\u00021\u0001\u0002\u000e\u0005Q1\u000f^1siF+XM]=\u0015\u0017M\fy(a!\u0002\b\u0006%\u0015Q\u0012\u0005\b\u0003\u0003K\u0001\u0019AA5\u0003A!\u0017\r^1TiJ,\u0017-\\,sSR,'\u000fC\u0004\u0002\u0006&\u0001\r!a\u0012\u0002\r\r|gNZ5h\u0011\u0015\t\u0012\u00021\u0001S\u0011\u001d\tY)\u0003a\u0001\u0003c\t\u0011\"];feft\u0015-\\3\t\u000f\u0005=\u0015\u00021\u0001\u00022\u0005i1\r[3dWB|\u0017N\u001c;ESJ$b!!\r\u0002\u0014\u0006U\u0005\"B\t\u000b\u0001\u0004\u0011\u0006bBA\u0001\u0015\u0001\u0007\u00111\u0001")
/* loaded from: input_file:it/agilelab/bigdata/wasp/consumers/spark/streaming/actor/etl/MaterializationSteps.class */
public interface MaterializationSteps {
    Function3<StructuredStreamingETLModel, WriterModel, SparkSession, Option<SparkStructuredStreamingWriter>> writerFactory();

    SparkSession sparkSession();

    default Try<StreamingQuery> materialize(StructuredStreamingETLModel structuredStreamingETLModel, PipegraphModel pipegraphModel, Dataset<Row> dataset) {
        return retrieveSparkStreamingConfig().recoverWith(new MaterializationSteps$$anonfun$materialize$1(null, structuredStreamingETLModel)).flatMap(sparkStreamingConfigModel -> {
            return this.generateCheckPointDir(structuredStreamingETLModel, sparkStreamingConfigModel, pipegraphModel).recoverWith(new MaterializationSteps$$anonfun$$nestedInanonfun$materialize$2$1(null, structuredStreamingETLModel)).flatMap(str -> {
                return this.createWriter(structuredStreamingETLModel).recoverWith(new MaterializationSteps$$anonfun$$nestedInanonfun$materialize$3$1(null, structuredStreamingETLModel)).flatMap(sparkStructuredStreamingWriter -> {
                    return this.write(sparkStructuredStreamingWriter, dataset).recoverWith(new MaterializationSteps$$anonfun$$nestedInanonfun$materialize$4$1(null, structuredStreamingETLModel)).flatMap(dataStreamWriter -> {
                        return this.startQuery(dataStreamWriter, sparkStreamingConfigModel, structuredStreamingETLModel, this.queryName(structuredStreamingETLModel, pipegraphModel), str).recoverWith(new MaterializationSteps$$anonfun$$nestedInanonfun$materialize$5$1(null, structuredStreamingETLModel)).map(streamingQuery -> {
                            return streamingQuery;
                        });
                    });
                });
            });
        });
    }

    private default Try<String> generateCheckPointDir(StructuredStreamingETLModel structuredStreamingETLModel, SparkStreamingConfigModel sparkStreamingConfigModel, PipegraphModel pipegraphModel) {
        return Try$.MODULE$.apply(() -> {
            return SparkUtils$.MODULE$.generateSpecificStructuredStreamingCheckpointDir(pipegraphModel, structuredStreamingETLModel);
        });
    }

    private default Try<SparkStreamingConfigModel> retrieveSparkStreamingConfig() {
        return Try$.MODULE$.apply(() -> {
            return ConfigManager$.MODULE$.getSparkStreamingConfig();
        });
    }

    private default Try<SparkStructuredStreamingWriter> createWriter(StructuredStreamingETLModel structuredStreamingETLModel) {
        Success failure;
        boolean z = false;
        Success success = null;
        Failure apply = Try$.MODULE$.apply(() -> {
            return (Option) this.writerFactory().apply(structuredStreamingETLModel, structuredStreamingETLModel.streamingOutput(), this.sparkSession());
        });
        if (apply instanceof Success) {
            z = true;
            success = (Success) apply;
            Some some = (Option) success.value();
            if (some instanceof Some) {
                failure = new Success((SparkStructuredStreamingWriter) some.value());
                return failure;
            }
        }
        if (z) {
            if (None$.MODULE$.equals((Option) success.value())) {
                failure = new Failure(new Exception("Could not instantiate writer"));
                return failure;
            }
        }
        if (!(apply instanceof Failure)) {
            throw new MatchError(apply);
        }
        failure = new Failure(apply.exception());
        return failure;
    }

    private default Try<DataStreamWriter<Row>> write(SparkStructuredStreamingWriter sparkStructuredStreamingWriter, Dataset<Row> dataset) {
        return Try$.MODULE$.apply(() -> {
            return sparkStructuredStreamingWriter.write(dataset);
        });
    }

    private default Try<StreamingQuery> startQuery(DataStreamWriter<Row> dataStreamWriter, SparkStreamingConfigModel sparkStreamingConfigModel, StructuredStreamingETLModel structuredStreamingETLModel, String str, String str2) {
        return Try$.MODULE$.apply(() -> {
            return dataStreamWriter.queryName(str).option("checkpointLocation", str2).trigger(Trigger.ProcessingTime(SparkUtils$.MODULE$.getTriggerIntervalMs(sparkStreamingConfigModel, structuredStreamingETLModel))).start();
        });
    }

    private default String queryName(StructuredStreamingETLModel structuredStreamingETLModel, PipegraphModel pipegraphModel) {
        return BaseConsumersMasterGuadian$.MODULE$.generateUniqueComponentName(pipegraphModel, structuredStreamingETLModel);
    }

    static void $init$(MaterializationSteps materializationSteps) {
    }
}
