package org.apache.spark.python;

import java.io.File;
import java.io.PrintWriter;
import java.net.InetAddress;
import java.nio.file.CopyOption;
import java.nio.file.Files;
import java.util.List;
import java.util.Map;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkUserAppException;
import org.apache.spark.api.python.PythonUtils$;
import org.apache.spark.deploy.PythonRunner$;
import org.apache.spark.internal.Logging;
import org.apache.spark.internal.config.package$;
import org.apache.spark.python.PythonHelper;
import org.apache.spark.util.RedirectThread;
import org.apache.spark.util.RedirectThread$;
import org.apache.spark.util.Utils$;
import org.slf4j.Logger;
import py4j.GatewayServer;
import scala.Function0;
import scala.None$;
import scala.Option;
import scala.Predef$;
import scala.Some;
import scala.collection.ArrayOps$;
import scala.collection.immutable.$colon;
import scala.collection.immutable.Nil$;
import scala.collection.immutable.Seq;
import scala.collection.mutable.ArrayBuffer;
import scala.jdk.CollectionConverters$;
import scala.reflect.ClassTag$;
import scala.runtime.BoxesRunTime;
import scala.runtime.LazyRef;

/* compiled from: PythonHelper.scala */
/* loaded from: input_file:org/apache/spark/python/PythonHelper$.class */
public final class PythonHelper$ implements Logging {
    public static final PythonHelper$ MODULE$ = new PythonHelper$();
    private static transient Logger org$apache$spark$internal$Logging$$log_;

    static {
        Logging.$init$(MODULE$);
    }

    public String logName() {
        return Logging.logName$(this);
    }

    public Logger log() {
        return Logging.log$(this);
    }

    public void logInfo(Function0<String> function0) {
        Logging.logInfo$(this, function0);
    }

    public void logDebug(Function0<String> function0) {
        Logging.logDebug$(this, function0);
    }

    public void logTrace(Function0<String> function0) {
        Logging.logTrace$(this, function0);
    }

    public void logWarning(Function0<String> function0) {
        Logging.logWarning$(this, function0);
    }

    public void logError(Function0<String> function0) {
        Logging.logError$(this, function0);
    }

    public void logInfo(Function0<String> function0, Throwable th) {
        Logging.logInfo$(this, function0, th);
    }

    public void logDebug(Function0<String> function0, Throwable th) {
        Logging.logDebug$(this, function0, th);
    }

    public void logTrace(Function0<String> function0, Throwable th) {
        Logging.logTrace$(this, function0, th);
    }

    public void logWarning(Function0<String> function0, Throwable th) {
        Logging.logWarning$(this, function0, th);
    }

    public void logError(Function0<String> function0, Throwable th) {
        Logging.logError$(this, function0, th);
    }

    public boolean isTraceEnabled() {
        return Logging.isTraceEnabled$(this);
    }

    public void initializeLogIfNecessary(boolean z) {
        Logging.initializeLogIfNecessary$(this, z);
    }

    public boolean initializeLogIfNecessary(boolean z, boolean z2) {
        return Logging.initializeLogIfNecessary$(this, z, z2);
    }

    public boolean initializeLogIfNecessary$default$2() {
        return Logging.initializeLogIfNecessary$default$2$(this);
    }

    public void initializeForcefully(boolean z, boolean z2) {
        Logging.initializeForcefully$(this, z, z2);
    }

    public Logger org$apache$spark$internal$Logging$$log_() {
        return org$apache$spark$internal$Logging$$log_;
    }

    public void org$apache$spark$internal$Logging$$log__$eq(Logger logger) {
        org$apache$spark$internal$Logging$$log_ = logger;
    }

    public <T extends PythonHelper.SparkEntryPoint> void exec(T t, String str) {
        File createTempFile = File.createTempFile("pythontransform", ".py");
        Seq seq = Nil$.MODULE$;
        SparkConf conf = t.session().sparkContext().getConf();
        String createSecret = Utils$.MODULE$.createSecret(conf);
        String str2 = (String) ((Option) conf.get(package$.MODULE$.PYSPARK_DRIVER_PYTHON())).orElse(() -> {
            return (Option) conf.get(package$.MODULE$.PYSPARK_PYTHON());
        }).orElse(() -> {
            return scala.sys.package$.MODULE$.env().get("PYSPARK_DRIVER_PYTHON");
        }).orElse(() -> {
            return scala.sys.package$.MODULE$.env().get("PYSPARK_PYTHON");
        }).getOrElse(() -> {
            return "python";
        });
        PrintWriter printWriter = new PrintWriter(createTempFile);
        printWriter.write(str);
        printWriter.close();
        createTempFile.deleteOnExit();
        String formatPath = PythonRunner$.MODULE$.formatPath(createTempFile.getAbsolutePath(), PythonRunner$.MODULE$.formatPath$default$2());
        String[] resolvePyFiles = resolvePyFiles(PythonRunner$.MODULE$.formatPaths("", PythonRunner$.MODULE$.formatPaths$default$2()));
        InetAddress loopbackAddress = InetAddress.getLoopbackAddress();
        final GatewayServer build = new GatewayServer.GatewayServerBuilder().authToken(createSecret).javaPort(0).javaAddress(loopbackAddress).callbackClient(25334, loopbackAddress, createSecret).entryPoint(t).build();
        Thread thread = new Thread(new Runnable(build) { // from class: org.apache.spark.python.PythonHelper$$anon$1
            private final GatewayServer gatewayServer$1;

            @Override // java.lang.Runnable
            public void run() {
                Utils$.MODULE$.logUncaughtExceptions(() -> {
                    this.gatewayServer$1.start();
                });
            }

            {
                this.gatewayServer$1 = build;
            }
        });
        thread.setName("py4j-gateway-init");
        thread.setDaemon(true);
        thread.start();
        thread.join();
        logInfo(() -> {
            return "py4j gateway started";
        });
        ArrayBuffer arrayBuffer = new ArrayBuffer();
        arrayBuffer.$plus$plus$eq(Predef$.MODULE$.wrapRefArray(resolvePyFiles));
        arrayBuffer.$plus$eq(PythonUtils$.MODULE$.sparkPythonPath());
        arrayBuffer.$plus$eq(scala.sys.package$.MODULE$.env().getOrElse("PYTHONPATH", () -> {
            return "";
        }));
        String mergePythonPaths = PythonUtils$.MODULE$.mergePythonPaths(arrayBuffer.toSeq());
        ProcessBuilder processBuilder = new ProcessBuilder((List<String>) CollectionConverters$.MODULE$.SeqHasAsJava((scala.collection.Seq) new $colon.colon(str2, new $colon.colon(formatPath, Nil$.MODULE$)).$plus$plus(seq)).asJava());
        Map<String, String> environment = processBuilder.environment();
        environment.put("PYTHONPATH", mergePythonPaths);
        environment.put("PYTHONUNBUFFERED", "YES");
        environment.put("PYSPARK_GATEWAY_PORT", String.valueOf(BoxesRunTime.boxToInteger(build.getListeningPort())));
        environment.put("PYSPARK_GATEWAY_SECRET", createSecret);
        ((Option) conf.get(package$.MODULE$.PYSPARK_PYTHON())).foreach(str3 -> {
            return (String) environment.put("PYSPARK_PYTHON", str3);
        });
        scala.sys.package$.MODULE$.env().get("PYTHONHASHSEED").foreach(str4 -> {
            return (String) environment.put("PYTHONHASHSEED", str4);
        });
        processBuilder.redirectErrorStream(true);
        logInfo(() -> {
            return "starting python process";
        });
        try {
            Process start = processBuilder.start();
            new RedirectThread(start.getInputStream(), System.out, "redirect output", RedirectThread$.MODULE$.$lessinit$greater$default$4()).start();
            int waitFor = start.waitFor();
            if (waitFor != 0) {
                throw new SparkUserAppException(waitFor);
            }
            logInfo(() -> {
                return "python process ended successfully";
            });
        } finally {
            build.shutdown();
        }
    }

    private String[] resolvePyFiles(String[] strArr) {
        LazyRef lazyRef = new LazyRef();
        return (String[]) ArrayOps$.MODULE$.distinct$extension(Predef$.MODULE$.refArrayOps((Object[]) ArrayOps$.MODULE$.flatMap$extension(Predef$.MODULE$.refArrayOps(strArr), str -> {
            if (!str.endsWith(".py")) {
                return new Some(str);
            }
            File file = new File(str);
            if (!file.exists() || !file.isFile() || !file.canRead()) {
                return None$.MODULE$;
            }
            Files.copy(file.toPath(), new File(dest$1(lazyRef), file.getName()).toPath(), new CopyOption[0]);
            return new Some(dest$1(lazyRef).getAbsolutePath());
        }, ClassTag$.MODULE$.apply(String.class))));
    }

    private static final /* synthetic */ File dest$lzycompute$1(LazyRef lazyRef) {
        File file;
        File file2;
        synchronized (lazyRef) {
            if (lazyRef.initialized()) {
                file = (File) lazyRef.value();
            } else {
                file = (File) lazyRef.initialize(Utils$.MODULE$.createTempDir(Utils$.MODULE$.createTempDir$default$1(), "localPyFiles"));
            }
            file2 = file;
        }
        return file2;
    }

    private static final File dest$1(LazyRef lazyRef) {
        return lazyRef.initialized() ? (File) lazyRef.value() : dest$lzycompute$1(lazyRef);
    }

    private PythonHelper$() {
    }
}
