package edu.iu.dsc.tws.master.barrier;

import com.google.common.primitives.Longs;
import edu.iu.dsc.tws.api.config.Config;
import edu.iu.dsc.tws.api.exceptions.Twister2Exception;
import edu.iu.dsc.tws.api.exceptions.Twister2RuntimeException;
import edu.iu.dsc.tws.api.faulttolerance.FaultToleranceContext;
import edu.iu.dsc.tws.common.zk.ZKBarrierManager;
import edu.iu.dsc.tws.common.zk.ZKContext;
import edu.iu.dsc.tws.common.zk.ZKEventsManager;
import edu.iu.dsc.tws.common.zk.ZKUtils;
import edu.iu.dsc.tws.proto.jobmaster.JobMasterAPI;
import java.util.Iterator;
import java.util.Set;
import java.util.TreeSet;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.curator.utils.CloseableUtils;

/* loaded from: input_file:edu/iu/dsc/tws/master/barrier/ZKBarrierHandler.class */
public class ZKBarrierHandler implements BarrierResponder {
    private static final Logger LOG = Logger.getLogger(ZKBarrierHandler.class.getName());
    private BarrierMonitor barrierMonitor;
    private Config config;
    private String jobID;
    private int numberOfWorkers;
    private CuratorFramework client;
    private PathChildrenCache defaultBarrierCache;
    private PathChildrenCache initBarrierCache;
    private String rootPath;

    /* renamed from: edu.iu.dsc.tws.master.barrier.ZKBarrierHandler$2, reason: invalid class name */
    /* loaded from: input_file:edu/iu/dsc/tws/master/barrier/ZKBarrierHandler$2.class */
    static /* synthetic */ class AnonymousClass2 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$curator$framework$recipes$cache$PathChildrenCacheEvent$Type = new int[PathChildrenCacheEvent.Type.values().length];

        static {
            try {
                $SwitchMap$org$apache$curator$framework$recipes$cache$PathChildrenCacheEvent$Type[PathChildrenCacheEvent.Type.CHILD_ADDED.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$curator$framework$recipes$cache$PathChildrenCacheEvent$Type[PathChildrenCacheEvent.Type.CHILD_REMOVED.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
        }
    }

    public ZKBarrierHandler(BarrierMonitor barrierMonitor, Config config, String str, int i) {
        this.barrierMonitor = barrierMonitor;
        this.config = config;
        this.jobID = str;
        this.numberOfWorkers = i;
        this.rootPath = ZKContext.rootNode(config);
    }

    public void initialize(JobMasterAPI.JobMasterState jobMasterState) throws Twister2Exception {
        if (jobMasterState != JobMasterAPI.JobMasterState.JM_STARTED && jobMasterState != JobMasterAPI.JobMasterState.JM_RESTARTED) {
            throw new Twister2Exception("initialState has to be either JobMasterState.JM_STARTED or JobMasterState.JM_RESTARTED. Supplied value: " + jobMasterState);
        }
        try {
            this.client = ZKUtils.connectToServer(ZKContext.serverAddresses(this.config), FaultToleranceContext.sessionTimeout(this.config));
            if (jobMasterState == JobMasterAPI.JobMasterState.JM_RESTARTED) {
                this.defaultBarrierCache = new PathChildrenCache(this.client, ZKUtils.defaultBarrierDir(this.rootPath, this.jobID), true);
                addBarrierChildrenCacheListener(this.defaultBarrierCache, JobMasterAPI.BarrierType.DEFAULT);
                this.defaultBarrierCache.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE);
                TreeSet treeSet = new TreeSet();
                long initialWorkersAtBarrier = getInitialWorkersAtBarrier(this.defaultBarrierCache, treeSet);
                if (!treeSet.isEmpty()) {
                    this.barrierMonitor.initDefaultAfterRestart(treeSet, initialWorkersAtBarrier, this.numberOfWorkers);
                    LOG.info("Existing workers at default barrier: " + treeSet.size());
                    treeSet.clear();
                }
                this.initBarrierCache = new PathChildrenCache(this.client, ZKUtils.initBarrierDir(this.rootPath, this.jobID), true);
                addBarrierChildrenCacheListener(this.initBarrierCache, JobMasterAPI.BarrierType.INIT);
                this.initBarrierCache.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE);
                long initialWorkersAtBarrier2 = getInitialWorkersAtBarrier(this.initBarrierCache, treeSet);
                if (!treeSet.isEmpty()) {
                    this.barrierMonitor.initInitAfterRestart(treeSet, initialWorkersAtBarrier2, this.numberOfWorkers);
                    LOG.info("Existing workers at init barrier: " + treeSet);
                }
            } else {
                this.defaultBarrierCache = new PathChildrenCache(this.client, ZKUtils.defaultBarrierDir(this.rootPath, this.jobID), true);
                addBarrierChildrenCacheListener(this.defaultBarrierCache, JobMasterAPI.BarrierType.DEFAULT);
                this.defaultBarrierCache.start();
                this.initBarrierCache = new PathChildrenCache(this.client, ZKUtils.initBarrierDir(this.rootPath, this.jobID), true);
                addBarrierChildrenCacheListener(this.initBarrierCache, JobMasterAPI.BarrierType.INIT);
                this.initBarrierCache.start();
            }
        } catch (Exception e) {
            throw new Twister2Exception("Exception when initializing ZKMasterController.", e);
        } catch (Twister2Exception e2) {
            throw e2;
        }
    }

    private long getInitialWorkersAtBarrier(PathChildrenCache pathChildrenCache, Set<Integer> set) {
        Iterator it = pathChildrenCache.getCurrentData().iterator();
        while (it.hasNext()) {
            String path = ((ChildData) it.next()).getPath();
            set.add(Integer.valueOf(ZKUtils.getWorkerIDFromPersPath(path)));
            if (0 == 0) {
                try {
                    ZKBarrierManager.readWorkerTimeout(this.client, path);
                } catch (Twister2Exception e) {
                    throw new Twister2RuntimeException(e);
                }
            }
        }
        return 0L;
    }

    private void addBarrierChildrenCacheListener(PathChildrenCache pathChildrenCache, final JobMasterAPI.BarrierType barrierType) {
        pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() { // from class: edu.iu.dsc.tws.master.barrier.ZKBarrierHandler.1
            public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCacheEvent) {
                int workerIDFromPersPath = ZKUtils.getWorkerIDFromPersPath(pathChildrenCacheEvent.getData().getPath());
                long fromByteArray = Longs.fromByteArray(pathChildrenCacheEvent.getData().getData());
                switch (AnonymousClass2.$SwitchMap$org$apache$curator$framework$recipes$cache$PathChildrenCacheEvent$Type[pathChildrenCacheEvent.getType().ordinal()]) {
                    case 1:
                        if (barrierType == JobMasterAPI.BarrierType.DEFAULT) {
                            ZKBarrierHandler.this.barrierMonitor.arrivedAtDefault(workerIDFromPersPath, fromByteArray);
                            return;
                        } else {
                            if (barrierType == JobMasterAPI.BarrierType.INIT) {
                                ZKBarrierHandler.this.barrierMonitor.arrivedAtInit(workerIDFromPersPath, fromByteArray);
                                return;
                            }
                            return;
                        }
                    case 2:
                        if (barrierType == JobMasterAPI.BarrierType.DEFAULT) {
                            ZKBarrierHandler.this.barrierMonitor.removedFromDefault(workerIDFromPersPath);
                            return;
                        } else {
                            if (barrierType == JobMasterAPI.BarrierType.INIT) {
                                ZKBarrierHandler.this.barrierMonitor.removedFromInit(workerIDFromPersPath);
                                return;
                            }
                            return;
                        }
                    default:
                        return;
                }
            }
        });
    }

    @Override // edu.iu.dsc.tws.master.barrier.BarrierResponder
    public void allArrived(JobMasterAPI.BarrierType barrierType) {
        publishBarrierDone(barrierType, JobMasterAPI.BarrierResult.SUCCESS);
    }

    @Override // edu.iu.dsc.tws.master.barrier.BarrierResponder
    public void barrierFailed(JobMasterAPI.BarrierType barrierType, JobMasterAPI.BarrierResult barrierResult) {
        publishBarrierDone(barrierType, barrierResult);
    }

    public void publishBarrierDone(JobMasterAPI.BarrierType barrierType, JobMasterAPI.BarrierResult barrierResult) {
        try {
            ZKEventsManager.publishEvent(this.client, this.rootPath, this.jobID, JobMasterAPI.JobEvent.newBuilder().setBarrierDone(JobMasterAPI.BarrierDone.newBuilder().setBarrierType(barrierType).setResult(barrierResult).build()).build());
        } catch (Twister2Exception e) {
            LOG.log(Level.SEVERE, e.getMessage(), e);
        }
    }

    public void close() {
        CloseableUtils.closeQuietly(this.defaultBarrierCache);
        CloseableUtils.closeQuietly(this.initBarrierCache);
    }
}
