package io.mantisrx.server.master;

import com.netflix.fenzo.AutoScaleAction;
import com.netflix.fenzo.AutoScaleRule;
import com.netflix.fenzo.SchedulingResult;
import com.netflix.fenzo.TaskAssignmentResult;
import com.netflix.fenzo.TaskRequest;
import com.netflix.fenzo.TaskScheduler;
import com.netflix.fenzo.TaskSchedulingService;
import com.netflix.fenzo.VMAssignmentResult;
import com.netflix.fenzo.VirtualMachineCurrentState;
import com.netflix.fenzo.VirtualMachineLease;
import com.netflix.fenzo.queues.TaskQueue;
import com.netflix.fenzo.queues.TaskQueueException;
import com.netflix.fenzo.queues.tiered.TieredQueue;
import com.netflix.spectator.api.Tag;
import io.mantisrx.common.WorkerPorts;
import io.mantisrx.common.metrics.Counter;
import io.mantisrx.common.metrics.Gauge;
import io.mantisrx.common.metrics.Metrics;
import io.mantisrx.common.metrics.MetricsRegistry;
import io.mantisrx.common.metrics.spectator.GaugeCallback;
import io.mantisrx.common.metrics.spectator.MetricGroupId;
import io.mantisrx.common.metrics.spectator.MetricId;
import io.mantisrx.common.metrics.spectator.SpectatorRegistryFactory;
import io.mantisrx.server.core.BaseService;
import io.mantisrx.server.core.domain.WorkerId;
import io.mantisrx.server.master.config.ConfigurationProvider;
import io.mantisrx.server.master.scheduler.JobMessageRouter;
import io.mantisrx.server.master.scheduler.LaunchTaskRequest;
import io.mantisrx.server.master.scheduler.MantisScheduler;
import io.mantisrx.server.master.scheduler.ScheduleRequest;
import io.mantisrx.server.master.scheduler.SchedulingStateManager;
import io.mantisrx.server.master.scheduler.WorkerLaunchFailed;
import io.mantisrx.server.master.scheduler.WorkerLaunched;
import io.mantisrx.server.master.scheduler.WorkerRegistry;
import io.mantisrx.server.master.scheduler.WorkerUnscheduleable;
import io.mantisrx.shaded.com.google.common.collect.Sets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import org.HdrHistogram.SynchronizedHistogram;
import org.apache.mesos.Protos;
import org.joda.time.DateTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.Observer;
import rx.functions.Action1;
import rx.schedulers.Schedulers;

/* loaded from: input_file:io/mantisrx/server/master/SchedulingService.class */
public class SchedulingService extends BaseService implements MantisScheduler {
    private static final Logger logger = LoggerFactory.getLogger(SchedulingService.class);
    private final JobMessageRouter jobMessageRouter;
    private final WorkerRegistry workerRegistry;
    private final TaskScheduler taskScheduler;
    private final TaskSchedulingService taskSchedulingService;
    private final TieredQueue taskQueue;
    private final Counter numWorkersLaunched;
    private final Counter numResourceOffersReceived;
    private final Counter numResourceAllocations;
    private final Counter numResourceOffersRejected;
    private final Gauge workersToLaunch;
    private final Gauge pendingWorkers;
    private final Gauge schedulerRunMillis;
    private final Counter perWorkerSchedulingTimeMs;
    private final SynchronizedHistogram workerAcceptedToLaunchedDistMs;
    private final Gauge totalActiveAgents;
    private final Counter numAgentsUsed;
    private final Gauge idleAgents;
    private final Gauge totalAvailableCPUs;
    private final Gauge totalAllocatedCPUs;
    private final Gauge totalAvailableMemory;
    private final Gauge totalAllocatedMemory;
    private final Gauge totalAvailableNwMbps;
    private final Gauge totalAllocatedNwMbps;
    private final Gauge cpuUtilization;
    private final Gauge memoryUtilization;
    private final Gauge networkUtilization;
    private final Gauge dominantResUtilization;
    private final Gauge fenzoLaunchedTasks;
    private final Gauge jobMgrRunningWorkers;
    private final Counter numAutoScaleUpActions;
    private final Counter numAutoScaleDownActions;
    private final Counter numMissingWorkerPorts;
    private final Counter schedulingResultExceptions;
    private final Counter schedulingCallbackExceptions;
    private final SchedulingStateManager schedulingState;
    private final AtomicInteger idleMachinesCount;
    private final String slaveClusterAttributeName;
    private final long vmCurrentStatesCheckInterval = 10000;
    private final AtomicLong lastVmCurrentStatesCheckDone;
    private VirtualMachineMasterService virtualMachineService;
    private long SCHEDULING_ITERATION_INTERVAL_MILLIS;
    private long MAX_DELAY_MILLIS_BETWEEN_SCHEDULING_ITER;
    private AtomicLong lastSchedulingResultCallback;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: io.mantisrx.server.master.SchedulingService$3, reason: invalid class name */
    /* loaded from: input_file:io/mantisrx/server/master/SchedulingService$3.class */
    public static /* synthetic */ class AnonymousClass3 {
        static final /* synthetic */ int[] $SwitchMap$com$netflix$fenzo$AutoScaleAction$Type = new int[AutoScaleAction.Type.values().length];

        static {
            try {
                $SwitchMap$com$netflix$fenzo$AutoScaleAction$Type[AutoScaleAction.Type.Up.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$com$netflix$fenzo$AutoScaleAction$Type[AutoScaleAction.Type.Down.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
        }
    }

    public SchedulingService(JobMessageRouter jobMessageRouter, WorkerRegistry workerRegistry, Observable<String> observable, VirtualMachineMasterService virtualMachineMasterService) {
        super(true);
        this.workerAcceptedToLaunchedDistMs = new SynchronizedHistogram(3600000L, 3);
        this.idleMachinesCount = new AtomicInteger();
        this.vmCurrentStatesCheckInterval = 10000L;
        this.lastVmCurrentStatesCheckDone = new AtomicLong(System.currentTimeMillis());
        this.SCHEDULING_ITERATION_INTERVAL_MILLIS = 50L;
        this.MAX_DELAY_MILLIS_BETWEEN_SCHEDULING_ITER = 5000L;
        this.lastSchedulingResultCallback = new AtomicLong(System.currentTimeMillis());
        this.schedulingState = new SchedulingStateManager();
        this.jobMessageRouter = jobMessageRouter;
        this.workerRegistry = workerRegistry;
        this.virtualMachineService = virtualMachineMasterService;
        this.slaveClusterAttributeName = ConfigurationProvider.getConfig().getSlaveClusterAttributeName();
        this.SCHEDULING_ITERATION_INTERVAL_MILLIS = ConfigurationProvider.getConfig().getSchedulerIterationIntervalMillis();
        AgentFitnessCalculator agentFitnessCalculator = new AgentFitnessCalculator();
        TaskScheduler.Builder builder = new TaskScheduler.Builder();
        virtualMachineMasterService.getClass();
        TaskScheduler.Builder withAutoScaleByAttributeName = builder.withLeaseRejectAction(virtualMachineMasterService::rejectLease).withLeaseOfferExpirySecs(ConfigurationProvider.getConfig().getMesosLeaseOfferExpirySecs()).withFitnessCalculator(agentFitnessCalculator).withFitnessGoodEnoughFunction(agentFitnessCalculator.getFitnessGoodEnoughFunc()).withAutoScaleByAttributeName(ConfigurationProvider.getConfig().getAutoscaleByAttributeName());
        this.taskScheduler = setupTaskSchedulerAndAutoScaler(observable, ConfigurationProvider.getConfig().getDisableShortfallEvaluation() ? withAutoScaleByAttributeName.disableShortfallEvaluation() : withAutoScaleByAttributeName);
        this.taskScheduler.setActiveVmGroupAttributeName(ConfigurationProvider.getConfig().getActiveSlaveAttributeName());
        this.taskQueue = new TieredQueue(2);
        this.taskSchedulingService = setupTaskSchedulingService(this.taskScheduler);
        setupAutoscaleRulesDynamicUpdater();
        MetricGroupId metricGroupId = new MetricGroupId(SchedulingService.class.getCanonicalName());
        Metrics registerAndGet = MetricsRegistry.getInstance().registerAndGet(new Metrics.Builder().id(metricGroupId).addCounter("numWorkersLaunched").addCounter("numResourceOffersReceived").addCounter("numResourceAllocations").addCounter("numResourceOffersRejected").addGauge("workersToLaunch").addGauge("pendingWorkers").addGauge("schedulerRunMillis").addCounter("perWorkerSchedulingTimeMillis").addGauge(new GaugeCallback(metricGroupId, "workerAcceptedToLaunchedMsP50", () -> {
            return Double.valueOf(this.workerAcceptedToLaunchedDistMs.getValueAtPercentile(50.0d));
        })).addGauge(new GaugeCallback(metricGroupId, "workerAcceptedToLaunchedMsP95", () -> {
            return Double.valueOf(this.workerAcceptedToLaunchedDistMs.getValueAtPercentile(95.0d));
        })).addGauge(new GaugeCallback(metricGroupId, "workerAcceptedToLaunchedMsP99", () -> {
            return Double.valueOf(this.workerAcceptedToLaunchedDistMs.getValueAtPercentile(99.0d));
        })).addGauge(new GaugeCallback(metricGroupId, "workerAcceptedToLaunchedMsMax", () -> {
            return Double.valueOf(this.workerAcceptedToLaunchedDistMs.getValueAtPercentile(100.0d));
        })).addGauge("totalActiveAgents").addCounter("numAgentsUsed").addGauge("idleAgents").addGauge("totalAvailableCPUs").addGauge("totalAllocatedCPUs").addGauge("totalAvailableMemory").addGauge("totalAllocatedMemory").addGauge("totalAvailableNwMbps").addGauge("totalAllocatedNwMbps").addGauge("cpuUtilization").addGauge("memoryUtilization").addGauge("networkUtilization").addGauge("dominantResUtilization").addCounter("numAutoScaleUpActions").addCounter("numAutoScaleDownActions").addGauge("fenzoLaunchedTasks").addGauge("jobMgrRunningWorkers").addCounter("numMissingWorkerPorts").addCounter("schedulingResultExceptions").addCounter("schedulingCallbackExceptions").build());
        this.numWorkersLaunched = registerAndGet.getCounter("numWorkersLaunched");
        this.numResourceOffersReceived = registerAndGet.getCounter("numResourceOffersReceived");
        this.numResourceAllocations = registerAndGet.getCounter("numResourceAllocations");
        this.numResourceOffersRejected = registerAndGet.getCounter("numResourceOffersRejected");
        this.workersToLaunch = registerAndGet.getGauge("workersToLaunch");
        this.pendingWorkers = registerAndGet.getGauge("pendingWorkers");
        this.schedulerRunMillis = registerAndGet.getGauge("schedulerRunMillis");
        this.totalActiveAgents = registerAndGet.getGauge("totalActiveAgents");
        this.numAgentsUsed = registerAndGet.getCounter("numAgentsUsed");
        this.idleAgents = registerAndGet.getGauge("idleAgents");
        this.totalAvailableCPUs = registerAndGet.getGauge("totalAvailableCPUs");
        this.totalAllocatedCPUs = registerAndGet.getGauge("totalAllocatedCPUs");
        this.totalAvailableMemory = registerAndGet.getGauge("totalAvailableMemory");
        this.totalAllocatedMemory = registerAndGet.getGauge("totalAllocatedMemory");
        this.totalAvailableNwMbps = registerAndGet.getGauge("totalAvailableNwMbps");
        this.totalAllocatedNwMbps = registerAndGet.getGauge("totalAllocatedNwMbps");
        this.cpuUtilization = registerAndGet.getGauge("cpuUtilization");
        this.memoryUtilization = registerAndGet.getGauge("memoryUtilization");
        this.networkUtilization = registerAndGet.getGauge("networkUtilization");
        this.dominantResUtilization = registerAndGet.getGauge("dominantResUtilization");
        this.numAutoScaleUpActions = registerAndGet.getCounter("numAutoScaleUpActions");
        this.numAutoScaleDownActions = registerAndGet.getCounter("numAutoScaleDownActions");
        this.fenzoLaunchedTasks = registerAndGet.getGauge("fenzoLaunchedTasks");
        this.jobMgrRunningWorkers = registerAndGet.getGauge("jobMgrRunningWorkers");
        this.numMissingWorkerPorts = registerAndGet.getCounter("numMissingWorkerPorts");
        this.schedulingResultExceptions = registerAndGet.getCounter("schedulingResultExceptions");
        this.schedulingCallbackExceptions = registerAndGet.getCounter("schedulingCallbackExceptions");
        this.perWorkerSchedulingTimeMs = registerAndGet.getCounter("perWorkerSchedulingTimeMillis");
    }

    private TaskScheduler setupTaskSchedulerAndAutoScaler(Observable<String> observable, TaskScheduler.Builder builder) {
        int i = 4;
        TaskScheduler.Builder withAutoScalerMapHostnameAttributeName = builder.withAutoScaleDownBalancedByAttributeName(ConfigurationProvider.getConfig().getHostZoneAttributeName()).withAutoScalerMapHostnameAttributeName(ConfigurationProvider.getConfig().getAutoScalerMapHostnameAttributeName());
        AgentClustersAutoScaler agentClustersAutoScaler = AgentClustersAutoScaler.get();
        if (agentClustersAutoScaler != null) {
            try {
                Set<AutoScaleRule> rules = agentClustersAutoScaler.getRules();
                if (rules == null || rules.isEmpty()) {
                    logger.warn("No auto scale rules setup");
                } else {
                    for (AutoScaleRule autoScaleRule : rules) {
                        withAutoScalerMapHostnameAttributeName = withAutoScalerMapHostnameAttributeName.withAutoScaleRule(autoScaleRule);
                        i = Math.min(i, autoScaleRule.getMinIdleHostsToKeep());
                    }
                }
            } catch (IllegalStateException e) {
                logger.warn("Ignoring: " + e.getMessage());
            }
        }
        final TaskScheduler build = withAutoScalerMapHostnameAttributeName.withMaxOffersToReject(Math.max(1, i)).build();
        observable.doOnNext(new Action1<String>() { // from class: io.mantisrx.server.master.SchedulingService.1
            public void call(String str) {
                if (str.equals("ALL")) {
                    build.expireAllLeases();
                } else {
                    build.expireLease(str);
                }
            }
        }).subscribe();
        if (agentClustersAutoScaler != null) {
            final Observer<AutoScaleAction> autoScaleActionObserver = agentClustersAutoScaler.getAutoScaleActionObserver();
            build.setAutoscalerCallback(new com.netflix.fenzo.functions.Action1<AutoScaleAction>() { // from class: io.mantisrx.server.master.SchedulingService.2
                public void call(AutoScaleAction autoScaleAction) {
                    try {
                        switch (AnonymousClass3.$SwitchMap$com$netflix$fenzo$AutoScaleAction$Type[autoScaleAction.getType().ordinal()]) {
                            case 1:
                                SchedulingService.this.numAutoScaleUpActions.increment();
                                break;
                            case 2:
                                SchedulingService.this.numAutoScaleDownActions.increment();
                                break;
                        }
                        autoScaleActionObserver.onNext(autoScaleAction);
                    } catch (Exception e2) {
                        SchedulingService.logger.warn("Will continue after exception calling autoscale action observer: " + e2.getMessage(), e2);
                    }
                }
            });
        }
        return build;
    }

    private void setupAutoscaleRulesDynamicUpdater() {
        HashSet hashSet = new HashSet();
        Schedulers.computation().createWorker().schedulePeriodically(() -> {
            try {
                logger.debug("Updating cluster autoscale rules");
                AgentClustersAutoScaler agentClustersAutoScaler = AgentClustersAutoScaler.get();
                if (agentClustersAutoScaler == null) {
                    logger.warn("No agent cluster autoscaler defined, not setting up Fenzo autoscaler rules");
                    return;
                }
                Set<AutoScaleRule> rules = agentClustersAutoScaler.getRules();
                Collection autoScaleRules = this.taskScheduler.getAutoScaleRules();
                Set<String> set = (autoScaleRules == null || autoScaleRules.isEmpty()) ? hashSet : (Set) autoScaleRules.stream().collect(HashSet::new, (set2, autoScaleRule) -> {
                    set2.add(autoScaleRule.getRuleName());
                }, (v0, v1) -> {
                    v0.addAll(v1);
                });
                if (rules != null && !rules.isEmpty()) {
                    for (AutoScaleRule autoScaleRule2 : rules) {
                        logger.debug("Setting up autoscale rule: " + autoScaleRule2);
                        this.taskScheduler.addOrReplaceAutoScaleRule(autoScaleRule2);
                        set.remove(autoScaleRule2.getRuleName());
                    }
                }
                if (!set.isEmpty()) {
                    for (String str : set) {
                        logger.info("Removing autoscale rule " + str);
                        this.taskScheduler.removeAutoScaleRule(str);
                    }
                }
            } catch (Exception e) {
                logger.warn("Unexpected error updating cluster autoscale rules: " + e.getMessage());
            }
        }, 1L, 1L, TimeUnit.MINUTES);
    }

    private TaskSchedulingService setupTaskSchedulingService(TaskScheduler taskScheduler) {
        return new TaskSchedulingService.Builder().withTaskScheduler(taskScheduler).withLoopIntervalMillis(this.SCHEDULING_ITERATION_INTERVAL_MILLIS).withMaxDelayMillis(this.MAX_DELAY_MILLIS_BETWEEN_SCHEDULING_ITER).withSchedulingResultCallback(this::schedulingResultHandler).withTaskQueue(this.taskQueue).withOptimizingShortfallEvaluator().build();
    }

    private Optional<String> getAttribute(VirtualMachineLease virtualMachineLease, String str) {
        return virtualMachineLease.getAttributeMap() != null && virtualMachineLease.getAttributeMap().get(str) != null && ((Protos.Attribute) virtualMachineLease.getAttributeMap().get(str)).getText().hasValue() ? Optional.of(((Protos.Attribute) virtualMachineLease.getAttributeMap().get(str)).getText().getValue()) : Optional.empty();
    }

    private void launchTasks(Collection<TaskAssignmentResult> collection, List<VirtualMachineLease> list) {
        ArrayList arrayList = new ArrayList();
        for (TaskAssignmentResult taskAssignmentResult : collection) {
            ScheduleRequest request = taskAssignmentResult.getRequest();
            WorkerPorts workerPorts = null;
            try {
                workerPorts = new WorkerPorts(taskAssignmentResult.getAssignedPorts());
            } catch (IllegalArgumentException | IllegalStateException e) {
                logger.error("problem launching tasks for assignment result {}: {}", taskAssignmentResult, e);
                this.numMissingWorkerPorts.increment();
            }
            if (workerPorts == null) {
                unscheduleWorker(request.getWorkerId(), Optional.ofNullable(list.get(0).hostname()));
            } else if (this.jobMessageRouter.routeWorkerEvent(new WorkerLaunched(request.getWorkerId(), request.getStageNum(), list.get(0).hostname(), list.get(0).getVMID(), getAttribute(list.get(0), this.slaveClusterAttributeName), workerPorts))) {
                arrayList.add(new LaunchTaskRequest(request, workerPorts));
            } else {
                unscheduleWorker(request.getWorkerId(), Optional.ofNullable(list.get(0).hostname()));
            }
        }
        if (arrayList.isEmpty()) {
            Iterator<VirtualMachineLease> it = list.iterator();
            while (it.hasNext()) {
                this.virtualMachineService.rejectLease(it.next());
            }
        }
        Map<ScheduleRequest, LaunchTaskException> launchTasks = this.virtualMachineService.launchTasks(arrayList, list);
        Iterator<TaskAssignmentResult> it2 = collection.iterator();
        while (it2.hasNext()) {
            ScheduleRequest request2 = it2.next().getRequest();
            if (launchTasks.containsKey(request2)) {
                String str = getWorkerStringPrefix(request2.getStageNum(), request2.getWorkerId()) + " failed due to " + launchTasks.get(request2).getMessage();
                if (!this.jobMessageRouter.routeWorkerEvent(new WorkerLaunchFailed(request2.getWorkerId(), request2.getStageNum(), str))) {
                    logger.warn("Failed to route WorkerLaunchFailed for {} (err {})", request2.getWorkerId(), str);
                }
            }
        }
    }

    private String getWorkerStringPrefix(int i, WorkerId workerId) {
        return "stage " + i + " worker index=" + workerId.getWorkerIndex() + " number=" + workerId.getWorkerNum();
    }

    private void schedulingResultHandler(SchedulingResult schedulingResult) {
        int i;
        try {
            this.lastSchedulingResultCallback.set(System.currentTimeMillis());
            for (Exception exc : schedulingResult.getExceptions()) {
                logger.error("Scheduling result got exception: {}", exc.getMessage(), exc);
                this.schedulingResultExceptions.increment();
            }
            int i2 = 0;
            SchedulerCounters.getInstance().incrementResourceAllocationTrials(schedulingResult.getNumAllocations());
            Map resultMap = schedulingResult.getResultMap();
            if (resultMap != null) {
                i = resultMap.size();
                long currentTimeMillis = System.currentTimeMillis();
                for (Map.Entry entry : resultMap.entrySet()) {
                    launchTasks(((VMAssignmentResult) entry.getValue()).getTasksAssigned(), ((VMAssignmentResult) entry.getValue()).getLeasesUsed());
                    Iterator it = ((VMAssignmentResult) entry.getValue()).getTasksAssigned().iterator();
                    while (it.hasNext()) {
                        ScheduleRequest request = ((TaskAssignmentResult) it.next()).getRequest();
                        this.workerRegistry.getAcceptedAt(request.getWorkerId()).ifPresent(l -> {
                            this.workerAcceptedToLaunchedDistMs.recordValue(currentTimeMillis - l.longValue());
                        });
                        this.perWorkerSchedulingTimeMs.increment(currentTimeMillis - request.getReadyAt());
                    }
                    i2 += ((VMAssignmentResult) entry.getValue()).getTasksAssigned().size();
                }
            } else {
                i = 0;
            }
            for (Map.Entry entry2 : schedulingResult.getFailures().entrySet()) {
                ScheduleRequest scheduleRequest = (ScheduleRequest) entry2.getKey();
                if (!this.jobMessageRouter.routeWorkerEvent(new WorkerUnscheduleable(scheduleRequest.getWorkerId(), scheduleRequest.getStageNum()))) {
                    logger.warn("Failed to route {} WorkerUnscheduleable event", scheduleRequest.getWorkerId());
                    if (logger.isTraceEnabled()) {
                        logger.trace("Unscheduleable worker {} assignmentresults {}", scheduleRequest.getWorkerId(), entry2.getValue());
                    }
                }
            }
            this.numWorkersLaunched.increment(i2);
            this.numResourceOffersReceived.increment(schedulingResult.getLeasesAdded());
            this.numResourceAllocations.increment(schedulingResult.getNumAllocations());
            this.numResourceOffersRejected.increment(schedulingResult.getLeasesRejected());
            int size = i2 + schedulingResult.getFailures().size();
            this.workersToLaunch.set(size);
            this.pendingWorkers.set(schedulingResult.getFailures().size());
            this.schedulerRunMillis.set(schedulingResult.getRuntime());
            this.totalActiveAgents.set(schedulingResult.getTotalVMsCount());
            this.numAgentsUsed.increment(i);
            int idleVMsCount = schedulingResult.getIdleVMsCount();
            this.idleAgents.set(idleVMsCount);
            SchedulerCounters.getInstance().endIteration(size, i2, i, schedulingResult.getLeasesRejected());
            if (size > 0 && SchedulerCounters.getInstance().getCounter().getIterationNumber() % 10 == 0) {
                logger.info("Scheduling iteration result: " + SchedulerCounters.getInstance().toJsonString());
            }
            if (idleVMsCount != this.idleMachinesCount.get()) {
                logger.info("Idle machines: " + idleVMsCount);
                this.idleMachinesCount.set(idleVMsCount);
            }
            try {
                this.taskSchedulingService.requestVmCurrentStates(list -> {
                    if (this.lastVmCurrentStatesCheckDone.get() < System.currentTimeMillis() - 10000) {
                        this.schedulingState.setVMCurrentState(list);
                        verifyAndReportResUsageMetrics(list);
                        this.lastVmCurrentStatesCheckDone.set(System.currentTimeMillis());
                    }
                });
            } catch (TaskQueueException e) {
                logger.warn("got exception requesting VM states from Fenzo", e);
            }
            publishJobManagerAndFenzoWorkerMetrics();
        } catch (Exception e2) {
            logger.error("unexpected exception in scheduling result callback", e2);
            this.schedulingCallbackExceptions.increment();
        }
    }

    @Override // io.mantisrx.server.master.scheduler.MantisScheduler
    public void initializeRunningWorker(ScheduleRequest scheduleRequest, String str) {
        this.taskSchedulingService.initializeRunningTask(scheduleRequest, str);
    }

    @Override // io.mantisrx.server.master.scheduler.MantisScheduler
    public void scheduleWorker(ScheduleRequest scheduleRequest) {
        this.taskQueue.queueTask(scheduleRequest);
    }

    @Override // io.mantisrx.server.master.scheduler.MantisScheduler
    public void unscheduleWorker(WorkerId workerId, Optional<String> optional) {
        this.taskSchedulingService.removeTask(workerId.getId(), ScheduleRequest.DEFAULT_Q_ATTRIBUTES, optional.orElse(null));
    }

    @Override // io.mantisrx.server.master.scheduler.MantisScheduler
    public void unscheduleAndTerminateWorker(WorkerId workerId, Optional<String> optional) {
        this.taskSchedulingService.removeTask(workerId.getId(), ScheduleRequest.DEFAULT_Q_ATTRIBUTES, optional.orElse(null));
        this.virtualMachineService.killTask(workerId);
    }

    @Override // io.mantisrx.server.master.scheduler.MantisScheduler
    public void updateWorkerSchedulingReadyTime(WorkerId workerId, long j) {
        if (logger.isTraceEnabled()) {
            logger.trace("setting task {} ready time to {}", workerId, new DateTime(j));
        }
        this.taskSchedulingService.setTaskReadyTime(workerId.toString(), ScheduleRequest.DEFAULT_Q_ATTRIBUTES, j);
    }

    @Override // io.mantisrx.server.master.scheduler.MantisScheduler
    public void rescindOffer(String str) {
        if (str.equals("ALL")) {
            this.taskScheduler.expireAllLeases();
        } else {
            this.taskScheduler.expireLease(str);
        }
    }

    @Override // io.mantisrx.server.master.scheduler.MantisScheduler
    public void addOffers(List<VirtualMachineLease> list) {
        this.taskSchedulingService.addLeases(list);
    }

    @Override // io.mantisrx.server.master.scheduler.MantisScheduler
    public void rescindOffers(String str) {
        this.taskScheduler.expireAllLeases(str);
    }

    @Override // io.mantisrx.server.master.scheduler.MantisScheduler
    public void disableVM(String str, long j) throws IllegalStateException {
        this.taskScheduler.disableVM(str, j);
    }

    @Override // io.mantisrx.server.master.scheduler.MantisScheduler
    public void enableVM(String str) {
        this.taskScheduler.enableVM(str);
    }

    @Override // io.mantisrx.server.master.scheduler.MantisScheduler
    public List<VirtualMachineCurrentState> getCurrentVMState() {
        return this.schedulingState.getVMCurrentState();
    }

    @Override // io.mantisrx.server.master.scheduler.MantisScheduler
    public void setActiveVmGroups(List<String> list) {
        if (list != null) {
            this.taskScheduler.setActiveVmGroups(list);
        }
    }

    private void setupSchedulingServiceWatcherMetric() {
        logger.info("Setting up SchedulingServiceWatcher metrics");
        this.lastSchedulingResultCallback.set(System.currentTimeMillis());
        MetricsRegistry.getInstance().registerAndGet(new Metrics.Builder().id("SchedulingServiceWatcher", new Tag[0]).addGauge(new GaugeCallback(new MetricId("SchedulingServiceWatcher", "timeSinceLastSchedulingRunMs"), () -> {
            return Double.valueOf(System.currentTimeMillis() - this.lastSchedulingResultCallback.get());
        }, SpectatorRegistryFactory.getRegistry())).build());
    }

    public void start() {
        super.awaitActiveModeAndStart(() -> {
            logger.info("Scheduling service starting now");
            this.taskSchedulingService.start();
            setupSchedulingServiceWatcherMetric();
            if (logger.isDebugEnabled()) {
                try {
                    this.taskSchedulingService.requestAllTasks(map -> {
                        map.forEach((taskState, collection) -> {
                            logger.debug("state {} tasks {}", taskState, collection.toString());
                        });
                    });
                } catch (TaskQueueException e) {
                    logger.error("caught exception", e);
                }
            }
        });
    }

    private void publishJobManagerAndFenzoWorkerMetrics() {
        try {
            this.taskSchedulingService.requestAllTasks(map -> {
                map.forEach((taskState, collection) -> {
                    int size = collection.size();
                    if (taskState != TaskQueue.TaskState.LAUNCHED) {
                        logger.debug("{} {} tasks {}", new Object[]{Integer.valueOf(size), taskState, collection});
                        return;
                    }
                    int numRunningWorkers = this.workerRegistry.getNumRunningWorkers();
                    this.fenzoLaunchedTasks.set(size);
                    this.jobMgrRunningWorkers.set(numRunningWorkers);
                    if (numRunningWorkers != size) {
                        logger.error("{} running workers as per Job Manager, {} tasks launched as per Fenzo", Integer.valueOf(numRunningWorkers), Integer.valueOf(size));
                        if (logger.isDebugEnabled()) {
                            Set set = (Set) this.workerRegistry.getAllRunningWorkers().stream().map(workerId -> {
                                return workerId.getId();
                            }).collect(Collectors.toSet());
                            Set set2 = (Set) collection.stream().map(queuableTask -> {
                                return queuableTask.getId();
                            }).collect(Collectors.toSet());
                            logger.debug("Job Manager workers not in Fenzo {}", Sets.difference(set, set2));
                            logger.debug("Fenzo workers not in JobManagers {}", Sets.difference(set2, set));
                        }
                    }
                });
            });
        } catch (Exception e) {
            logger.error("caught exception when publishing worker metrics", e);
        }
    }

    private void verifyAndReportResUsageMetrics(List<VirtualMachineCurrentState> list) {
        double d = 0.0d;
        double d2 = 0.0d;
        double d3 = 0.0d;
        double d4 = 0.0d;
        double d5 = 0.0d;
        double d6 = 0.0d;
        for (VirtualMachineCurrentState virtualMachineCurrentState : list) {
            VirtualMachineLease currAvailableResources = virtualMachineCurrentState.getCurrAvailableResources();
            if (currAvailableResources != null) {
                d += currAvailableResources.cpuCores();
                d3 += currAvailableResources.memoryMB();
                d5 += currAvailableResources.networkMbps();
            }
            Collection<TaskRequest> runningTasks = virtualMachineCurrentState.getRunningTasks();
            if (runningTasks != null) {
                for (TaskRequest taskRequest : runningTasks) {
                    Optional fromId = WorkerId.fromId(taskRequest.getId());
                    if (fromId.isPresent() && this.workerRegistry.isWorkerValid((WorkerId) fromId.get())) {
                        d2 += taskRequest.getCPUs();
                        d += taskRequest.getCPUs();
                        d4 += taskRequest.getMemory();
                        d3 += taskRequest.getMemory();
                        d6 += taskRequest.getNetworkMbps();
                        d5 += taskRequest.getNetworkMbps();
                    } else {
                        this.taskSchedulingService.removeTask(taskRequest.getId(), ScheduleRequest.DEFAULT_Q_ATTRIBUTES, virtualMachineCurrentState.getHostname());
                    }
                }
            }
        }
        this.totalAvailableCPUs.set((long) d);
        this.totalAllocatedCPUs.set((long) d2);
        this.cpuUtilization.set((long) ((d2 * 100.0d) / d));
        this.totalAvailableMemory.set((long) d3);
        this.totalAllocatedMemory.set((long) d4);
        this.memoryUtilization.set((long) ((d4 * 100.0d) / d3));
        double max = Math.max((d2 * 100.0d) / d, (d4 * 100.0d) / d3);
        this.totalAvailableNwMbps.set((long) d5);
        this.totalAllocatedNwMbps.set((long) d6);
        this.networkUtilization.set((long) ((d6 * 100.0d) / d5));
        this.dominantResUtilization.set((long) Math.max(max, (d6 * 100.0d) / d5));
    }

    public void shutdown() {
        if (this.taskSchedulingService.isShutdown()) {
            return;
        }
        logger.info("shutting down Task Scheduling Service");
        this.taskSchedulingService.shutdown();
    }
}
