package org.flinkextended.flink.ml.operator.ops;

import java.net.URL;
import java.nio.file.Files;
import java.nio.file.LinkOption;
import java.nio.file.Paths;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.cache.DistributedCache;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
import org.apache.flink.python.PythonOptions;
import org.apache.flink.python.util.PythonDependencyUtils;
import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.flinkextended.flink.ml.cluster.ClusterConfig;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;

/* loaded from: input_file:org/flinkextended/flink/ml/operator/ops/PythonEnvironmentManagerTest.class */
public class PythonEnvironmentManagerTest {
    private StreamingRuntimeContext context;
    private Configuration flinkConfig;
    static final /* synthetic */ boolean $assertionsDisabled;

    @Before
    public void setUp() throws Exception {
        this.context = (StreamingRuntimeContext) Mockito.mock(StreamingRuntimeContext.class);
        TaskManagerRuntimeInfo taskManagerRuntimeInfo = (TaskManagerRuntimeInfo) Mockito.mock(TaskManagerRuntimeInfo.class);
        Mockito.when(taskManagerRuntimeInfo.getTmpDirectories()).thenReturn(new String[]{"/tmp"});
        Mockito.when(this.context.getTaskManagerRuntimeInfo()).thenReturn(taskManagerRuntimeInfo);
        Mockito.when(this.context.getJobId()).thenReturn(new JobID());
        this.flinkConfig = new Configuration();
    }

    @Test
    public void testArchiveZipFile() throws Exception {
        ClusterConfig build = ClusterConfig.newBuilder().setNodeEntry("/tmp/test.py", "main").build();
        this.flinkConfig.set(PythonDependencyUtils.PYTHON_ARCHIVES, Collections.singletonMap("data", "data.zip"));
        PythonEnvironmentManager pythonEnvironmentManager = new PythonEnvironmentManager(build, this.flinkConfig);
        URL resource = Thread.currentThread().getContextClassLoader().getResource("data.zip");
        if (!$assertionsDisabled && resource == null) {
            throw new AssertionError();
        }
        Mockito.when(this.context.getDistributedCache()).thenReturn(new DistributedCache(Collections.singletonMap("data", CompletableFuture.completedFuture(new Path(resource.getPath())))));
        pythonEnvironmentManager.open(this.context);
        Assert.assertTrue(Files.exists(Paths.get(pythonEnvironmentManager.getWorkingDirectory(), "data.zip", "data.txt"), new LinkOption[0]));
    }

    @Test
    public void testNoPyFlinkConfig() throws Exception {
        PythonEnvironmentManager pythonEnvironmentManager = new PythonEnvironmentManager(ClusterConfig.newBuilder().setNodeEntry("/tmp/test.py", "main").build(), this.flinkConfig);
        pythonEnvironmentManager.open(this.context);
        Assert.assertEquals(System.getProperty("user.dir"), pythonEnvironmentManager.getWorkingDirectory());
        Assert.assertEquals(":", pythonEnvironmentManager.getPythonPath());
        Assert.assertEquals("python", pythonEnvironmentManager.getPythonExec());
    }

    @Test
    public void testPythonFiles() throws Exception {
        ClusterConfig build = ClusterConfig.newBuilder().setNodeEntry("/tmp/test.py", "main").build();
        HashMap hashMap = new HashMap();
        hashMap.put("row_input.py", "row_input.py");
        hashMap.put("greeter.zip", "greeter.zip");
        this.flinkConfig.set(PythonDependencyUtils.PYTHON_FILES, hashMap);
        PythonEnvironmentManager pythonEnvironmentManager = new PythonEnvironmentManager(build, this.flinkConfig);
        URL resource = Thread.currentThread().getContextClassLoader().getResource("row_input.py");
        URL resource2 = Thread.currentThread().getContextClassLoader().getResource("greeter.zip");
        HashMap hashMap2 = new HashMap();
        hashMap2.put("row_input.py", CompletableFuture.completedFuture(new Path(((URL) Objects.requireNonNull(resource)).getPath())));
        hashMap2.put("greeter.zip", CompletableFuture.completedFuture(new Path(((URL) Objects.requireNonNull(resource2)).getPath())));
        Mockito.when(this.context.getDistributedCache()).thenReturn(new DistributedCache(hashMap2));
        pythonEnvironmentManager.open(this.context);
        Assert.assertTrue(checkInPythonPath("row_input.py", pythonEnvironmentManager.getPythonPath()));
        Assert.assertTrue(checkInPythonPath("greeter.py", pythonEnvironmentManager.getPythonPath()));
    }

    @Test
    public void testRequirements() throws Exception {
        ClusterConfig build = ClusterConfig.newBuilder().setNodeEntry("/tmp/test.py", "main").build();
        this.flinkConfig.set(PythonDependencyUtils.PYTHON_REQUIREMENTS_FILE, Collections.singletonMap("file", "requirements.txt"));
        PythonEnvironmentManager pythonEnvironmentManager = new PythonEnvironmentManager(build, this.flinkConfig);
        URL resource = Thread.currentThread().getContextClassLoader().getResource("requirements.txt");
        if (!$assertionsDisabled && resource == null) {
            throw new AssertionError();
        }
        Mockito.when(this.context.getDistributedCache()).thenReturn(new DistributedCache(Collections.singletonMap("requirements.txt", CompletableFuture.completedFuture(new Path(resource.getPath())))));
        pythonEnvironmentManager.open(this.context);
        checkInPythonPath("dl_on_flink_framework/cluster_config.py", pythonEnvironmentManager.getPythonPath());
        this.flinkConfig.set(PythonDependencyUtils.PYTHON_REQUIREMENTS_FILE, Collections.singletonMap("file", "requirements.txt"));
    }

    @Test
    public void testPythonExec() throws Exception {
        ClusterConfig build = ClusterConfig.newBuilder().setNodeEntry("/tmp/test.py", "main").build();
        this.flinkConfig.set(PythonOptions.PYTHON_EXECUTABLE, "/my_python");
        PythonEnvironmentManager pythonEnvironmentManager = new PythonEnvironmentManager(build, this.flinkConfig);
        pythonEnvironmentManager.open(this.context);
        Assert.assertEquals("/my_python", pythonEnvironmentManager.getPythonExec());
    }

    private boolean checkInPythonPath(String str, String str2) {
        return Arrays.stream(str2.split(":")).anyMatch(str3 -> {
            return Files.exists(Paths.get(str3, str), new LinkOption[0]);
        });
    }

    static {
        $assertionsDisabled = !PythonEnvironmentManagerTest.class.desiredAssertionStatus();
    }
}
