package it.agilelab.bigdata.wasp.consumers.spark.utils;

import it.agilelab.bigdata.wasp.core.logging.Logging;
import it.agilelab.bigdata.wasp.core.logging.WaspLogger;
import it.agilelab.bigdata.wasp.core.utils.ElasticConfiguration;
import it.agilelab.bigdata.wasp.core.utils.SparkStreamingConfiguration;
import it.agilelab.bigdata.wasp.core.utils.WaspConfiguration;
import it.agilelab.bigdata.wasp.models.PipegraphModel;
import it.agilelab.bigdata.wasp.models.StructuredStreamingETLModel;
import it.agilelab.bigdata.wasp.models.configuration.ConnectionConfig;
import it.agilelab.bigdata.wasp.models.configuration.ElasticConfigModel;
import it.agilelab.bigdata.wasp.models.configuration.KafkaConfigModel;
import it.agilelab.bigdata.wasp.models.configuration.SparkConfigModel;
import it.agilelab.bigdata.wasp.models.configuration.SparkStreamingConfigModel;
import it.agilelab.bigdata.wasp.models.configuration.TelemetryConfigModel;
import it.agilelab.bigdata.wasp.models.configuration.TelemetryTopicConfigModelMessageFormat$;
import it.agilelab.bigdata.wasp.models.configuration.WaspConfigModel;
import java.io.File;
import java.nio.charset.StandardCharsets;
import java.util.Base64;
import org.apache.spark.SparkConf;
import org.apache.spark.UtilsForwarder$;
import scala.Array$;
import scala.MatchError;
import scala.Option$;
import scala.Tuple2;
import scala.collection.Iterable;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.TraversableOnce;
import scala.collection.immutable.Nil$;
import scala.io.BufferedSource;
import scala.io.Codec$;
import scala.io.Source$;
import scala.reflect.ClassTag$;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;

/* compiled from: SparkUtils.scala */
/* loaded from: input_file:it/agilelab/bigdata/wasp/consumers/spark/utils/SparkUtils$.class */
public final class SparkUtils$ implements Logging, WaspConfiguration, ElasticConfiguration, SparkStreamingConfiguration {
    public static SparkUtils$ MODULE$;
    private final String jarsListFileName;
    private SparkStreamingConfigModel sparkStreamingConfig;
    private ElasticConfigModel elasticConfig;
    private WaspConfigModel waspConfig;
    private final WaspLogger logger;
    private volatile byte bitmap$0;

    static {
        new SparkUtils$();
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v0 */
    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r0v10, types: [it.agilelab.bigdata.wasp.consumers.spark.utils.SparkUtils$] */
    private SparkStreamingConfigModel sparkStreamingConfig$lzycompute() {
        ?? r0 = this;
        synchronized (r0) {
            if (((byte) (this.bitmap$0 & 1)) == 0) {
                this.sparkStreamingConfig = SparkStreamingConfiguration.sparkStreamingConfig$(this);
                r0 = this;
                r0.bitmap$0 = (byte) (this.bitmap$0 | 1);
            }
        }
        return this.sparkStreamingConfig;
    }

    public SparkStreamingConfigModel sparkStreamingConfig() {
        return ((byte) (this.bitmap$0 & 1)) == 0 ? sparkStreamingConfig$lzycompute() : this.sparkStreamingConfig;
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v0 */
    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r0v10, types: [it.agilelab.bigdata.wasp.consumers.spark.utils.SparkUtils$] */
    private ElasticConfigModel elasticConfig$lzycompute() {
        ?? r0 = this;
        synchronized (r0) {
            if (((byte) (this.bitmap$0 & 2)) == 0) {
                this.elasticConfig = ElasticConfiguration.elasticConfig$(this);
                r0 = this;
                r0.bitmap$0 = (byte) (this.bitmap$0 | 2);
            }
        }
        return this.elasticConfig;
    }

    public ElasticConfigModel elasticConfig() {
        return ((byte) (this.bitmap$0 & 2)) == 0 ? elasticConfig$lzycompute() : this.elasticConfig;
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v0 */
    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r0v10, types: [it.agilelab.bigdata.wasp.consumers.spark.utils.SparkUtils$] */
    private WaspConfigModel waspConfig$lzycompute() {
        ?? r0 = this;
        synchronized (r0) {
            if (((byte) (this.bitmap$0 & 4)) == 0) {
                this.waspConfig = WaspConfiguration.waspConfig$(this);
                r0 = this;
                r0.bitmap$0 = (byte) (this.bitmap$0 | 4);
            }
        }
        return this.waspConfig;
    }

    public WaspConfigModel waspConfig() {
        return ((byte) (this.bitmap$0 & 4)) == 0 ? waspConfig$lzycompute() : this.waspConfig;
    }

    public WaspLogger logger() {
        return this.logger;
    }

    public void it$agilelab$bigdata$wasp$core$logging$Logging$_setter_$logger_$eq(WaspLogger waspLogger) {
        this.logger = waspLogger;
    }

    public String jarsListFileName() {
        return this.jarsListFileName;
    }

    public SparkConf buildSparkConfFromSparkConfigModel(SparkConfigModel sparkConfigModel, TelemetryConfigModel telemetryConfigModel, KafkaConfigModel kafkaConfigModel) {
        logger().info(() -> {
            return "Building Spark configuration from configuration model";
        });
        logger().info(() -> {
            return new StringBuilder(33).append("Starting from SparkConfigModel:\n\t").append(sparkConfigModel).toString();
        });
        SparkConf master = new SparkConf().setAppName(sparkConfigModel.appName()).setMaster(sparkConfigModel.master().toString());
        master.set("spark.submit.deployMode", sparkConfigModel.driver().submitDeployMode()).set("spark.driver.cores", BoxesRunTime.boxToInteger(sparkConfigModel.driver().cores()).toString()).set("spark.driver.memory", sparkConfigModel.driver().memory()).set("spark.driver.host", sparkConfigModel.driver().host()).set("spark.driver.bindAddress", sparkConfigModel.driver().bindAddress());
        if (sparkConfigModel.driver().port() != 0) {
            master.set("spark.driver.port", BoxesRunTime.boxToInteger(sparkConfigModel.driver().port()).toString());
        } else {
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
        }
        master.set("spark.executor.cores", BoxesRunTime.boxToInteger(sparkConfigModel.executorCores()).toString()).set("spark.executor.memory", sparkConfigModel.executorMemory()).set("spark.cores.max", BoxesRunTime.boxToInteger(sparkConfigModel.coresMax()).toString()).set("spark.executor.instances", BoxesRunTime.boxToInteger(sparkConfigModel.executorInstances()).toString()).setJars(getAdditionalJars(sparkConfigModel.additionalJarsPath())).set("spark.yarn.jars", sparkConfigModel.yarnJar()).set("spark.blockManager.port", BoxesRunTime.boxToInteger(sparkConfigModel.blockManagerPort()).toString()).set("spark.ui.retainedStages", BoxesRunTime.boxToInteger(sparkConfigModel.retained().retainedStagesJobs()).toString()).set("spark.ui.retainedTasks", BoxesRunTime.boxToInteger(sparkConfigModel.retained().retainedTasks()).toString()).set("spark.ui.retainedJobs", BoxesRunTime.boxToInteger(sparkConfigModel.retained().retainedJobs()).toString()).set("spark.sql.ui.retainedExecutions", BoxesRunTime.boxToInteger(sparkConfigModel.retained().retainedExecutions()).toString()).set("spark.streaming.ui.retainedBatches", BoxesRunTime.boxToInteger(sparkConfigModel.retained().retainedBatches()).toString()).setAll((Iterable) sparkConfigModel.others().map(sparkEntryConfig -> {
            return new Tuple2(sparkEntryConfig.key(), sparkEntryConfig.value());
        }, Seq$.MODULE$.canBuildFrom()));
        if (sparkConfigModel.kryoSerializer().enabled()) {
            master.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
            master.registerKryoClasses((Class[]) Array$.MODULE$.apply(Nil$.MODULE$, ClassTag$.MODULE$.apply(Class.class)));
            master.set("spark.kryo.registrator", sparkConfigModel.kryoSerializer().registrators());
            master.set("spark.kryo.registrationRequired", BoxesRunTime.boxToBoolean(sparkConfigModel.kryoSerializer().strict()).toString());
            master.set("spark.kryo.registrationRequired", BoxesRunTime.boxToBoolean(sparkConfigModel.kryoSerializer().strict()).toString());
        } else {
            BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
        }
        master.set("es.nodes", ((TraversableOnce) ((Seq) elasticConfig().connections().filter(connectionConfig -> {
            return BoxesRunTime.boxToBoolean($anonfun$buildSparkConfFromSparkConfigModel$4(connectionConfig));
        })).map(connectionConfig2 -> {
            return new StringBuilder(1).append(connectionConfig2.host()).append(":").append(connectionConfig2.port()).toString();
        }, Seq$.MODULE$.canBuildFrom())).mkString(","));
        String str = master.get("spark.executor.extraJavaOptions", "");
        master.set("spark.executor.extraJavaOptions", new StringBuilder(66).append(str).append(" -Dwasp.plugin.telemetry.kafka=\"").append(Base64.getUrlEncoder().encodeToString(TelemetryTopicConfigModelMessageFormat$.MODULE$.tinyKafkaConfigFormat().write(kafkaConfigModel.toTinyConfig()).toString().getBytes(StandardCharsets.UTF_8))).append("\" -Dwasp.plugin.telemetry.topic=\"").append(Base64.getUrlEncoder().encodeToString(TelemetryTopicConfigModelMessageFormat$.MODULE$.telemetryTopicConfigModelFormat().write(telemetryConfigModel.telemetryTopicConfigModel()).toString().getBytes(StandardCharsets.UTF_8))).append("\"").toString());
        if (sparkConfigModel instanceof SparkStreamingConfigModel) {
            ((SparkStreamingConfigModel) sparkConfigModel).nifiStateless().foreach(nifiStatelessConfigModel -> {
                if (nifiStatelessConfigModel == null) {
                    throw new MatchError(nifiStatelessConfigModel);
                }
                String bootstrapJars = nifiStatelessConfigModel.bootstrapJars();
                String systemJars = nifiStatelessConfigModel.systemJars();
                String statelessJars = nifiStatelessConfigModel.statelessJars();
                String extensions = nifiStatelessConfigModel.extensions();
                master.set("spark.wasp.nifi.lib.stateless", statelessJars);
                master.set("spark.wasp.nifi.lib.bootstrap", bootstrapJars);
                master.set("spark.wasp.nifi.lib.system", systemJars);
                master.set("spark.wasp.nifi.lib.extensions", extensions);
                return master.set("spark.executor.plugins", (String) Option$.MODULE$.apply(master.get("spark.executor.plugins", "")).filterNot(str2 -> {
                    return BoxesRunTime.boxToBoolean(str2.isEmpty());
                }).map(str3 -> {
                    return new StringBuilder(55).append(str3).append(",it.agilelab.bigdata.wasp.spark.plugins.nifi.NifiPlugin").toString();
                }).getOrElse(() -> {
                    return "it.agilelab.bigdata.wasp.spark.plugins.nifi.NifiPlugin";
                }));
            });
            BoxedUnit boxedUnit3 = BoxedUnit.UNIT;
        } else {
            BoxedUnit boxedUnit4 = BoxedUnit.UNIT;
        }
        logger().info(() -> {
            return new StringBuilder(22).append("Resulting SparkConf:\n\t").append(master.toDebugString().replace("\n", "\n\t")).toString();
        });
        return master;
    }

    private Seq<String> getAdditionalJars(String str) {
        BufferedSource fromFile = Source$.MODULE$.fromFile(new StringBuilder(0).append(str).append(File.separator).append(jarsListFileName()).toString(), Codec$.MODULE$.fallbackSystemCodec());
        try {
            try {
                return fromFile.getLines().map(str2 -> {
                    return UtilsForwarder$.MODULE$.resolveURI(new StringBuilder(0).append(str).append(File.separator).append(str2).toString());
                }).map(uri -> {
                    return uri.toString();
                }).toVector();
            } catch (Throwable th) {
                String sb = new StringBuilder(68).append("Unable to completely generate the additional jars list - Exception: ").append(th.getMessage()).toString();
                logger().error(() -> {
                    return sb;
                }, th);
                throw th;
            }
        } finally {
            fromFile.close();
        }
    }

    public String generateSpecificStructuredStreamingCheckpointDir(PipegraphModel pipegraphModel, StructuredStreamingETLModel structuredStreamingETLModel) {
        String environmentPrefix = waspConfig().environmentPrefix();
        return new StringBuilder(24).append(sparkStreamingConfig().checkpointDir()).append((environmentPrefix != null ? !environmentPrefix.equals("") : "" != 0) ? new StringBuilder(1).append("/").append(waspConfig().environmentPrefix()).toString() : "").append("/").append("structured_streaming").append("/").append(pipegraphModel.generateStandardPipegraphName()).append("/").append(structuredStreamingETLModel.generateStandardProcessingComponentName()).append("_").append(structuredStreamingETLModel.generateStandardWriterName()).toString();
    }

    public long getTriggerIntervalMs(SparkStreamingConfigModel sparkStreamingConfigModel, StructuredStreamingETLModel structuredStreamingETLModel) {
        return BoxesRunTime.unboxToLong(structuredStreamingETLModel.triggerIntervalMs().orElse(() -> {
            return sparkStreamingConfigModel.triggerIntervalMs();
        }).getOrElse(() -> {
            return 0L;
        }));
    }

    public static final /* synthetic */ boolean $anonfun$buildSparkConfFromSparkConfigModel$4(ConnectionConfig connectionConfig) {
        Object orElse = connectionConfig.metadata().flatMap(map -> {
            return map.get("connectiontype");
        }).getOrElse(() -> {
            return "";
        });
        return orElse != null ? orElse.equals("rest") : "rest" == 0;
    }

    private SparkUtils$() {
        MODULE$ = this;
        Logging.$init$(this);
        WaspConfiguration.$init$(this);
        ElasticConfiguration.$init$(this);
        SparkStreamingConfiguration.$init$(this);
        this.jarsListFileName = "jars.list";
    }
}
