package org.flinkextended.flink.ml.operator.util;

import com.google.common.base.Joiner;
import java.io.IOException;
import java.net.URI;
import java.nio.file.CopyOption;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.attribute.FileAttribute;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.cache.DistributedCache;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Preconditions;
import org.flinkextended.flink.ml.cluster.MLConfig;
import org.flinkextended.flink.ml.cluster.node.MLContext;

/* loaded from: input_file:org/flinkextended/flink/ml/operator/util/PythonFileUtil.class */
public class PythonFileUtil {
    private static final String SPLITTER = ",";
    static final /* synthetic */ boolean $assertionsDisabled;

    public static void registerPythonFiles(StreamExecutionEnvironment streamExecutionEnvironment, MLConfig mLConfig) throws IOException {
        if (mLConfig.getProperties().containsKey("remote_code_zip_file")) {
            mLConfig.addProperty("user_entry_python_file", mLConfig.getPythonFiles()[0]);
        } else {
            mLConfig.addProperty("python_files", Joiner.on(",").join(registerPythonLibFilesIfNotExist(streamExecutionEnvironment, mLConfig.getPythonFiles())));
        }
    }

    public static void preparePythonFilesForExec(RuntimeContext runtimeContext, MLContext mLContext) throws IOException {
        if (mLContext.useDistributeCache()) {
            String str = (String) mLContext.getProperties().get("python_files");
            if (StringUtils.isEmpty(str)) {
                return;
            }
            String[] split = str.split(",");
            DistributedCache distributedCache = runtimeContext.getDistributedCache();
            Path createTempDir = mLContext.createTempDir("ml_on_flink_", new FileAttribute[0]);
            for (String str2 : split) {
                Files.copy(distributedCache.getFile(str2).toPath(), createTempDir.resolve(str2), new CopyOption[0]);
            }
            mLContext.setPythonDir(createTempDir);
            mLContext.setPythonFiles(split);
        }
    }

    public static List<String> registerPythonLibFilesIfNotExist(StreamExecutionEnvironment streamExecutionEnvironment, String... strArr) throws IOException {
        Tuple2<Map<String, URI>, List<String>> convertFiles = convertFiles(strArr);
        ((Map) convertFiles.f0).forEach((str, uri) -> {
            registerCachedFileIfNotExist(streamExecutionEnvironment.getCachedFiles(), uri.getPath(), str);
        });
        return (List) convertFiles.f1;
    }

    private static Tuple2<Map<String, URI>, List<String>> convertFiles(String... strArr) {
        HashMap hashMap = new HashMap();
        ArrayList arrayList = new ArrayList();
        for (String str : strArr) {
            URI create = URI.create(str);
            if (create.getScheme() == null) {
                create = Paths.get(str, new String[0]).toUri();
            }
            String fragment = create.getFragment() != null ? create.getFragment() : Paths.get(create).getFileName().toString();
            hashMap.put(fragment, create);
            arrayList.add(fragment);
        }
        return new Tuple2<>(hashMap, arrayList);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static void registerCachedFileIfNotExist(List<Tuple2<String, DistributedCache.DistributedCacheEntry>> list, String str, String str2) {
        if (list.stream().noneMatch(tuple2 -> {
            return ((String) tuple2.f0).equals(str2);
        })) {
            list.add(new Tuple2<>(str2, new DistributedCache.DistributedCacheEntry(str, false)));
            return;
        }
        Tuple2<String, DistributedCache.DistributedCacheEntry> orElse = list.stream().filter(tuple22 -> {
            return ((String) tuple22.f0).equals(str2);
        }).findFirst().orElse(null);
        if (!$assertionsDisabled && orElse == null) {
            throw new AssertionError();
        }
        Preconditions.checkState(((DistributedCache.DistributedCacheEntry) orElse.f1).filePath.equals(str), "Fail to register cache file with key %s file path %s, the same key has been registered with path %s", new Object[]{str2, str, ((DistributedCache.DistributedCacheEntry) orElse.f1).filePath});
    }

    static {
        $assertionsDisabled = !PythonFileUtil.class.desiredAssertionStatus();
    }
}
