package edu.iu.dsc.tws.master.server;

import edu.iu.dsc.tws.api.config.Config;
import edu.iu.dsc.tws.api.exceptions.Twister2Exception;
import edu.iu.dsc.tws.api.faulttolerance.FaultToleranceContext;
import edu.iu.dsc.tws.common.zk.WorkerWithState;
import edu.iu.dsc.tws.common.zk.ZKContext;
import edu.iu.dsc.tws.common.zk.ZKEphemStateManager;
import edu.iu.dsc.tws.common.zk.ZKEventsManager;
import edu.iu.dsc.tws.common.zk.ZKPersStateManager;
import edu.iu.dsc.tws.common.zk.ZKUtils;
import edu.iu.dsc.tws.master.JobMasterContext;
import edu.iu.dsc.tws.proto.jobmaster.JobMasterAPI;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.curator.framework.recipes.nodes.PersistentNode;
import org.apache.curator.utils.CloseableUtils;

/* loaded from: input_file:edu/iu/dsc/tws/master/server/ZKMasterController.class */
public class ZKMasterController {
    public static final Logger LOG = Logger.getLogger(ZKMasterController.class.getName());
    protected int numberOfWorkers;
    protected String jobID;
    protected Config config;
    protected String rootPath;
    protected String persDir;
    protected String ephemDir;
    protected String barrierDir;
    private String jmAddress;
    protected CuratorFramework client;
    protected PathChildrenCache ephemChildrenCache;
    protected PathChildrenCache persChildrenCache;
    protected PathChildrenCache barrierChildrenCache;
    private PersistentNode masterEphemZNode;
    private List<Integer> scaledDownWorkers = new LinkedList();
    private WorkerMonitor workerMonitor;

    /* renamed from: edu.iu.dsc.tws.master.server.ZKMasterController$4, reason: invalid class name */
    /* loaded from: input_file:edu/iu/dsc/tws/master/server/ZKMasterController$4.class */
    static /* synthetic */ class AnonymousClass4 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$curator$framework$recipes$cache$PathChildrenCacheEvent$Type = new int[PathChildrenCacheEvent.Type.values().length];

        static {
            try {
                $SwitchMap$org$apache$curator$framework$recipes$cache$PathChildrenCacheEvent$Type[PathChildrenCacheEvent.Type.CHILD_ADDED.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$curator$framework$recipes$cache$PathChildrenCacheEvent$Type[PathChildrenCacheEvent.Type.CHILD_REMOVED.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$apache$curator$framework$recipes$cache$PathChildrenCacheEvent$Type[PathChildrenCacheEvent.Type.CHILD_UPDATED.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
        }
    }

    public ZKMasterController(Config config, String str, int i, String str2, WorkerMonitor workerMonitor) {
        this.config = config;
        this.jobID = str;
        this.numberOfWorkers = i;
        this.jmAddress = str2;
        this.workerMonitor = workerMonitor;
        this.rootPath = ZKContext.rootNode(config);
        this.persDir = ZKUtils.persDir(this.rootPath, str);
        this.ephemDir = ZKUtils.ephemDir(this.rootPath, str);
        this.barrierDir = ZKUtils.barrierDir(this.rootPath, str);
    }

    public void initialize(JobMasterAPI.JobMasterState jobMasterState) throws Twister2Exception {
        if (jobMasterState != JobMasterAPI.JobMasterState.JM_STARTED && jobMasterState != JobMasterAPI.JobMasterState.JM_RESTARTED) {
            throw new Twister2Exception("initialState has to be either JobMasterState.JM_STARTED or JobMasterState.JM_RESTARTED. Supplied value: " + jobMasterState);
        }
        try {
            this.client = ZKUtils.connectToServer(ZKContext.serverAddresses(this.config), FaultToleranceContext.sessionTimeout(this.config));
            if (jobMasterState == JobMasterAPI.JobMasterState.JM_RESTARTED) {
                initRestarting();
            } else {
                this.ephemChildrenCache = new PathChildrenCache(this.client, this.ephemDir, true);
                addEphemChildrenCacheListener(this.ephemChildrenCache);
                this.ephemChildrenCache.start();
                this.persChildrenCache = new PathChildrenCache(this.client, this.persDir, true);
                addPersChildrenCacheListener(this.persChildrenCache);
                this.persChildrenCache.start();
            }
            this.barrierChildrenCache = new PathChildrenCache(this.client, this.barrierDir, true);
            addBarrierChildrenCacheListener(this.barrierChildrenCache);
            this.barrierChildrenCache.start();
            LOG.info("Job Master: " + this.jmAddress + " initialized successfully.");
        } catch (Exception e) {
            throw new Twister2Exception("Exception when initializing ZKMasterController.", e);
        } catch (Twister2Exception e2) {
            throw e2;
        }
    }

    private void initRestarting() throws Exception {
        LOG.info("Job Master restarting .... ");
        this.ephemChildrenCache = new PathChildrenCache(this.client, this.ephemDir, true);
        addEphemChildrenCacheListener(this.ephemChildrenCache);
        this.ephemChildrenCache.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE);
        List currentData = this.ephemChildrenCache.getCurrentData();
        LOG.info("Initially existing workers: " + currentData.size());
        this.persChildrenCache = new PathChildrenCache(this.client, this.persDir, true);
        addPersChildrenCacheListener(this.persChildrenCache);
        this.persChildrenCache.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE);
        LinkedList linkedList = new LinkedList();
        Iterator it = currentData.iterator();
        while (it.hasNext()) {
            String path = ((ChildData) it.next()).getPath();
            WorkerWithState workerWithState = getWorkerWithState(ZKUtils.getWorkerIDFromEphemPath(path));
            if (workerWithState != null) {
                linkedList.add(workerWithState);
            } else {
                LOG.severe("worker[" + path + "] added, but its data can not be retrieved.");
            }
        }
        publishJobMasterRestarted();
        if (!this.workerMonitor.addJoinedWorkers(linkedList) || allJoinedPublished()) {
            return;
        }
        LOG.info("Publishing AllJoined event when restarting, since it is not previously published.");
        publishAllJoined();
    }

    private boolean allJoinedPublished() throws Exception {
        for (JobMasterAPI.JobEvent jobEvent : ZKEventsManager.getAllEvents(this.client, this.rootPath, this.jobID).values()) {
            if (jobEvent.hasAllJoined()) {
                return jobEvent.getAllJoined().getNumberOfWorkers() == this.numberOfWorkers;
            }
        }
        return false;
    }

    public void jobScaledUp(int i) {
        this.numberOfWorkers = i;
    }

    public void jobScaledDown(int i) {
        this.scaledDownWorkers = new LinkedList();
        for (int i2 = i; i2 < this.numberOfWorkers; i2++) {
            this.scaledDownWorkers.add(Integer.valueOf(i2));
        }
        this.numberOfWorkers = i;
    }

    private void createJMEphemZnode(JobMasterAPI.JobMasterState jobMasterState) {
        this.masterEphemZNode = ZKUtils.createPersistentEphemeralZnode(ZKUtils.jmEphemPath(this.rootPath, this.jobID), ZKUtils.encodeJobMasterZnode(this.jmAddress, jobMasterState.getNumber()));
        this.masterEphemZNode.start();
        try {
            this.masterEphemZNode.waitForInitialCreate(10000L, TimeUnit.MILLISECONDS);
            LOG.info("An ephemeral znode is created for the job master: " + this.masterEphemZNode.getActualPath());
        } catch (InterruptedException e) {
            LOG.log(Level.SEVERE, "Could not create job master znode.", (Throwable) e);
            throw new RuntimeException("Could not create job master znode", e);
        }
    }

    private void addEphemChildrenCacheListener(PathChildrenCache pathChildrenCache) {
        pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() { // from class: edu.iu.dsc.tws.master.server.ZKMasterController.1
            public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCacheEvent) {
                switch (AnonymousClass4.$SwitchMap$org$apache$curator$framework$recipes$cache$PathChildrenCacheEvent$Type[pathChildrenCacheEvent.getType().ordinal()]) {
                    case 1:
                        ZKMasterController.this.workerZnodeAdded(pathChildrenCacheEvent);
                        return;
                    case 2:
                        ZKMasterController.this.workerZnodeRemoved(pathChildrenCacheEvent);
                        return;
                    default:
                        return;
                }
            }
        });
    }

    private void addPersChildrenCacheListener(PathChildrenCache pathChildrenCache) {
        pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() { // from class: edu.iu.dsc.tws.master.server.ZKMasterController.2
            public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCacheEvent) {
                switch (AnonymousClass4.$SwitchMap$org$apache$curator$framework$recipes$cache$PathChildrenCacheEvent$Type[pathChildrenCacheEvent.getType().ordinal()]) {
                    case JobMasterContext.JM_TO_DASHBOARD_CONNECTIONS_DEFAULT /* 3 */:
                        ZKMasterController.this.childZnodeUpdated(pathChildrenCacheEvent);
                        return;
                    default:
                        return;
                }
            }
        });
    }

    private void addBarrierChildrenCacheListener(PathChildrenCache pathChildrenCache) {
        pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() { // from class: edu.iu.dsc.tws.master.server.ZKMasterController.3
            public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCacheEvent) {
                switch (AnonymousClass4.$SwitchMap$org$apache$curator$framework$recipes$cache$PathChildrenCacheEvent$Type[pathChildrenCacheEvent.getType().ordinal()]) {
                    case 1:
                        ZKMasterController.this.barrierZnodeAdded(pathChildrenCacheEvent);
                        return;
                    default:
                        return;
                }
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void workerZnodeAdded(PathChildrenCacheEvent pathChildrenCacheEvent) {
        boolean isAllJoined = this.workerMonitor.isAllJoined();
        int workerIDFromEphemPath = ZKUtils.getWorkerIDFromEphemPath(pathChildrenCacheEvent.getData().getPath());
        WorkerWithState workerWithState = getWorkerWithState(workerIDFromEphemPath);
        if (workerWithState == null) {
            LOG.severe("worker[" + workerIDFromEphemPath + "] added, but its data can not be retrieved.");
            return;
        }
        if (workerWithState.getState() == JobMasterAPI.WorkerState.RESTARTED) {
            this.workerMonitor.restarted(workerWithState);
            publishWorkerRestarted(workerWithState);
        } else {
            if (workerWithState.getState() != JobMasterAPI.WorkerState.STARTED) {
                LOG.warning("Following worker joined with initial state of " + workerWithState.getState() + "Something must be wrong. Ignoring this event. WorkerInfo: " + workerWithState.getInfo());
                return;
            }
            this.workerMonitor.started(workerWithState);
        }
        if (isAllJoined || !this.workerMonitor.isAllJoined()) {
            return;
        }
        publishAllJoined();
    }

    private WorkerWithState getWorkerWithState(int i) {
        String workerPath = ZKUtils.workerPath(this.persDir, i);
        ChildData currentData = this.persChildrenCache.getCurrentData(workerPath);
        if (currentData != null) {
            return WorkerWithState.decode(currentData.getData());
        }
        try {
            return ZKPersStateManager.getWorkerWithState(this.client, workerPath);
        } catch (Twister2Exception e) {
            LOG.log(Level.SEVERE, e.getMessage(), e);
            return null;
        }
    }

    public void publishWorkerRestarted(WorkerWithState workerWithState) {
        try {
            ZKEventsManager.publishEvent(this.client, this.rootPath, this.jobID, JobMasterAPI.JobEvent.newBuilder().setRestarted(JobMasterAPI.WorkerRestarted.newBuilder().setWorkerInfo(workerWithState.getInfo()).build()).build());
        } catch (Twister2Exception e) {
            LOG.log(Level.SEVERE, e.getMessage(), e);
        }
    }

    public void publishWorkerFailed(int i) {
        try {
            ZKEventsManager.publishEvent(this.client, this.rootPath, this.jobID, JobMasterAPI.JobEvent.newBuilder().setFailed(JobMasterAPI.WorkerFailed.newBuilder().setWorkerID(i).build()).build());
        } catch (Twister2Exception e) {
            LOG.log(Level.SEVERE, e.getMessage(), e);
        }
    }

    public void publishAllJoined() {
        List<JobMasterAPI.WorkerInfo> workerInfoList = this.workerMonitor.getWorkerInfoList();
        try {
            ZKEventsManager.publishEvent(this.client, this.rootPath, this.jobID, JobMasterAPI.JobEvent.newBuilder().setAllJoined(JobMasterAPI.AllWorkersJoined.newBuilder().addAllWorkerInfo(workerInfoList).setNumberOfWorkers(workerInfoList.size()).build()).build());
        } catch (Twister2Exception e) {
            LOG.log(Level.SEVERE, e.getMessage(), e);
        }
    }

    public void publishJobMasterRestarted() {
        try {
            ZKEventsManager.publishEvent(this.client, this.rootPath, this.jobID, JobMasterAPI.JobEvent.newBuilder().setJmRestarted(JobMasterAPI.JobMasterRestarted.newBuilder().setNumberOfWorkers(this.numberOfWorkers).setJmAddress(this.jmAddress).build()).build());
        } catch (Twister2Exception e) {
            LOG.log(Level.SEVERE, e.getMessage(), e);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void workerZnodeRemoved(PathChildrenCacheEvent pathChildrenCacheEvent) {
        String path = pathChildrenCacheEvent.getData().getPath();
        int workerIDFromEphemPath = ZKUtils.getWorkerIDFromEphemPath(path);
        if (this.scaledDownWorkers.contains(Integer.valueOf(workerIDFromEphemPath))) {
            this.scaledDownWorkers.remove(Integer.valueOf(workerIDFromEphemPath));
            LOG.info("Removed scaled down worker: " + workerIDFromEphemPath);
            return;
        }
        try {
            WorkerWithState workerWithState = ZKPersStateManager.getWorkerWithState(this.client, this.rootPath, this.jobID, workerIDFromEphemPath);
            if (workerWithState == null) {
                LOG.severe("worker[" + workerIDFromEphemPath + "] removed, but its data can not be retrieved.");
                return;
            }
            String decodeWorkerZnodeBody = ZKEphemStateManager.decodeWorkerZnodeBody(pathChildrenCacheEvent.getData().getData());
            if (workerWithState.getState() == JobMasterAPI.WorkerState.COMPLETED) {
                return;
            }
            if ("DELETED_BY_RESTARTING_WORKER".equals(decodeWorkerZnodeBody)) {
                LOG.info("Restarting worker deleted znode from previous run: " + path);
                return;
            }
            LOG.info(String.format("Worker[%s] FAILED. Worker last status: %s", Integer.valueOf(workerIDFromEphemPath), workerWithState.getState()));
            this.workerMonitor.failed(workerIDFromEphemPath);
            try {
                ZKPersStateManager.updateWorkerStatus(this.client, this.rootPath, this.jobID, workerWithState.getInfo(), JobMasterAPI.WorkerState.FAILED);
            } catch (Twister2Exception e) {
                LOG.log(Level.SEVERE, e.getMessage(), e);
            }
            publishWorkerFailed(workerWithState.getWorkerID());
        } catch (Twister2Exception e2) {
            LOG.log(Level.SEVERE, "worker[" + workerIDFromEphemPath + "] removed, but its data can not be retrieved.", e2);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void childZnodeUpdated(PathChildrenCacheEvent pathChildrenCacheEvent) {
        int workerIDFromPersPath = ZKUtils.getWorkerIDFromPersPath(pathChildrenCacheEvent.getData().getPath());
        WorkerWithState decode = WorkerWithState.decode(pathChildrenCacheEvent.getData().getData());
        LOG.fine(String.format("Worker[%s] status changed to: %s ", Integer.valueOf(workerIDFromPersPath), decode.getState()));
        if (decode.getState() == JobMasterAPI.WorkerState.COMPLETED) {
            this.workerMonitor.completed(workerIDFromPersPath);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void barrierZnodeAdded(PathChildrenCacheEvent pathChildrenCacheEvent) {
        if (this.barrierChildrenCache.getCurrentData().size() == this.numberOfWorkers) {
            try {
                ZKEventsManager.publishEvent(this.client, this.rootPath, this.jobID, JobMasterAPI.JobEvent.newBuilder().setAllArrived(JobMasterAPI.AllArrivedOnBarrier.newBuilder().setNumberOfWorkers(this.numberOfWorkers).build()).build());
            } catch (Twister2Exception e) {
                LOG.log(Level.SEVERE, e.getMessage(), e);
            }
        }
    }

    public void close() {
        CloseableUtils.closeQuietly(this.ephemChildrenCache);
        CloseableUtils.closeQuietly(this.persChildrenCache);
        CloseableUtils.closeQuietly(this.barrierChildrenCache);
        if (this.masterEphemZNode != null) {
            CloseableUtils.closeQuietly(this.masterEphemZNode);
        }
    }
}
