package edu.iu.dsc.tws.rsched.schedulers.k8s.logger;

import com.google.gson.reflect.TypeToken;
import edu.iu.dsc.tws.proto.system.job.JobAPI;
import edu.iu.dsc.tws.rsched.schedulers.k8s.KubernetesController;
import edu.iu.dsc.tws.rsched.schedulers.k8s.KubernetesUtils;
import edu.iu.dsc.tws.rsched.schedulers.k8s.worker.K8sWorkerUtils;
import edu.iu.dsc.tws.rsched.utils.FileUtils;
import io.kubernetes.client.openapi.ApiCallback;
import io.kubernetes.client.openapi.ApiException;
import io.kubernetes.client.openapi.apis.CoreV1Api;
import io.kubernetes.client.openapi.models.V1Container;
import io.kubernetes.client.openapi.models.V1Pod;
import io.kubernetes.client.util.Watch;
import java.io.IOException;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.logging.Level;
import java.util.logging.Logger;

/* loaded from: input_file:edu/iu/dsc/tws/rsched/schedulers/k8s/logger/JobLogger.class */
public class JobLogger extends Thread {
    private static final Logger LOG = Logger.getLogger(JobLogger.class.getName());
    private JobAPI.Job job;
    private String namespace;
    private String logsDir;
    private CoreV1Api v1Api;
    private Watch<V1Pod> watcher;
    private boolean stopLogger = false;
    private List<WorkerLogger> loggers = new LinkedList();
    private Set<String> completedLoggers = new ConcurrentSkipListSet();
    private int numberOfWorkers;

    public JobLogger(String str, JobAPI.Job job) {
        this.namespace = str;
        this.job = job;
        this.numberOfWorkers = job.getNumberOfWorkers();
    }

    @Override // java.lang.Thread, java.lang.Runnable
    public void run() {
        this.v1Api = KubernetesController.createCoreV1Api();
        this.logsDir = System.getProperty("user.home") + "/.twister2/" + this.job.getJobId();
        if (!FileUtils.isDirectoryExists(this.logsDir)) {
            FileUtils.createDirectory(this.logsDir);
        }
        LOG.info("Job logs directory: " + this.logsDir);
        watchPodsToRunningStartLoggers();
    }

    public synchronized void workerLoggerCompleted(String str) {
        this.completedLoggers.add(str);
        if (this.completedLoggers.size() == this.numberOfWorkers + 1) {
            stopLogger();
            LOG.info("All workers completed. Job has finished.");
        }
    }

    /* JADX WARN: Type inference failed for: r3v3, types: [edu.iu.dsc.tws.rsched.schedulers.k8s.logger.JobLogger$1] */
    private void watchPodsToRunningStartLoggers() {
        try {
            this.watcher = Watch.createWatch(KubernetesController.getApiClient(), this.v1Api.listNamespacedPodCall(this.namespace, (String) null, (Boolean) null, (String) null, (String) null, KubernetesUtils.createJobPodsLabelWithKey(this.job.getJobId()), (Integer) null, (String) null, Integer.MAX_VALUE, Boolean.TRUE, (ApiCallback) null), new TypeToken<Watch.Response<V1Pod>>() { // from class: edu.iu.dsc.tws.rsched.schedulers.k8s.logger.JobLogger.1
            }.getType());
            try {
                Iterator it = this.watcher.iterator();
                while (it.hasNext()) {
                    Watch.Response response = (Watch.Response) it.next();
                    if (this.stopLogger) {
                        break;
                    }
                    if (response.object != null && ((V1Pod) response.object).getMetadata().getName().startsWith(this.job.getJobId()) && "Running".equals(((V1Pod) response.object).getStatus().getPhase()) && ((V1Pod) response.object).getMetadata().getDeletionTimestamp() == null) {
                        String name = ((V1Pod) response.object).getMetadata().getName();
                        String podIP = ((V1Pod) response.object).getStatus().getPodIP();
                        List<V1Container> containers = ((V1Pod) response.object).getSpec().getContainers();
                        if (name.endsWith("-jm-0")) {
                            startWorkerLogger(new WorkerLogger(this.namespace, name, ((V1Container) containers.get(0)).getName(), "job-master-ip" + podIP, this.logsDir, this.v1Api, this));
                        } else {
                            for (V1Container v1Container : containers) {
                                int calculateWorkerID = K8sWorkerUtils.calculateWorkerID(this.job, name, v1Container.getName());
                                if (calculateWorkerID >= this.numberOfWorkers) {
                                    this.numberOfWorkers = calculateWorkerID + 1;
                                }
                                startWorkerLogger(new WorkerLogger(this.namespace, name, v1Container.getName(), "worker" + calculateWorkerID + "-ip" + podIP, this.logsDir, this.v1Api, this));
                            }
                        }
                    }
                }
                closeWatcher();
            } catch (RuntimeException e) {
                if (!this.stopLogger) {
                    throw e;
                }
                LOG.fine("JobLogger is stopped.");
            }
        } catch (ApiException e2) {
            LOG.log(Level.SEVERE, "Exception when watching the pods to get the IPs: \nexCode: " + e2.getCode() + "\nresponseBody: " + e2.getResponseBody(), e2);
            throw new RuntimeException(e2);
        }
    }

    private void startWorkerLogger(WorkerLogger workerLogger) {
        if (this.loggers.contains(workerLogger) && this.loggers.get(this.loggers.indexOf(workerLogger)).isAlive()) {
            LOG.info("Ignoring " + workerLogger.getID() + " start event for logging, since a logger for that worker is already running.");
            return;
        }
        workerLogger.start();
        this.completedLoggers.removeIf(str -> {
            return str.equals(workerLogger.getID());
        });
        this.loggers.add(workerLogger);
    }

    private void closeWatcher() {
        if (this.watcher == null) {
            return;
        }
        try {
            this.watcher.close();
        } catch (IOException e) {
            LOG.log(Level.SEVERE, "Exception closing watcher.", (Throwable) e);
        }
        this.watcher = null;
    }

    public void stopLogger() {
        this.stopLogger = true;
        closeWatcher();
        for (WorkerLogger workerLogger : this.loggers) {
            if (workerLogger.isAlive()) {
                workerLogger.stopLogging();
            }
        }
    }
}
