package io.mantisrx.master.jobcluster.job;

import akka.actor.AbstractActor;
import akka.actor.AbstractActorWithTimers;
import akka.actor.ActorRef;
import akka.actor.PoisonPill;
import akka.actor.Props;
import akka.actor.SupervisorStrategy;
import com.netflix.spectator.api.BasicTag;
import com.netflix.spectator.api.Tag;
import io.mantisrx.common.WorkerPorts;
import io.mantisrx.common.akka.MantisActorSupervisorStrategy;
import io.mantisrx.common.metrics.Counter;
import io.mantisrx.common.metrics.Metrics;
import io.mantisrx.common.metrics.MetricsRegistry;
import io.mantisrx.common.metrics.spectator.MetricGroupId;
import io.mantisrx.master.StringConstants;
import io.mantisrx.master.events.LifecycleEventPublisher;
import io.mantisrx.master.events.LifecycleEventsProto;
import io.mantisrx.master.jobcluster.WorkerInfoListHolder;
import io.mantisrx.master.jobcluster.job.MantisJobMetadataImpl;
import io.mantisrx.master.jobcluster.job.MantisStageMetadataImpl;
import io.mantisrx.master.jobcluster.job.worker.IMantisWorkerMetadata;
import io.mantisrx.master.jobcluster.job.worker.JobWorker;
import io.mantisrx.master.jobcluster.job.worker.WorkerHeartbeat;
import io.mantisrx.master.jobcluster.job.worker.WorkerState;
import io.mantisrx.master.jobcluster.job.worker.WorkerStatus;
import io.mantisrx.master.jobcluster.job.worker.WorkerTerminate;
import io.mantisrx.master.jobcluster.proto.BaseResponse;
import io.mantisrx.master.jobcluster.proto.JobClusterManagerProto;
import io.mantisrx.master.jobcluster.proto.JobClusterProto;
import io.mantisrx.master.jobcluster.proto.JobClusterScalerRuleProto;
import io.mantisrx.master.jobcluster.proto.JobProto;
import io.mantisrx.master.jobcluster.scaler.IJobClusterScalerRuleData;
import io.mantisrx.runtime.JobConstraints;
import io.mantisrx.runtime.JobSla;
import io.mantisrx.runtime.MachineDefinition;
import io.mantisrx.runtime.MantisJobDurationType;
import io.mantisrx.runtime.MantisJobState;
import io.mantisrx.runtime.MigrationStrategy;
import io.mantisrx.runtime.WorkerMigrationConfig;
import io.mantisrx.runtime.descriptor.JobScalingRule;
import io.mantisrx.runtime.descriptor.SchedulingInfo;
import io.mantisrx.runtime.descriptor.StageScalingPolicy;
import io.mantisrx.runtime.descriptor.StageSchedulingInfo;
import io.mantisrx.server.core.JobCompletedReason;
import io.mantisrx.server.core.JobScalerRuleInfo;
import io.mantisrx.server.core.JobSchedulingInfo;
import io.mantisrx.server.core.Status;
import io.mantisrx.server.core.WorkerAssignments;
import io.mantisrx.server.core.WorkerHost;
import io.mantisrx.server.core.domain.ArtifactID;
import io.mantisrx.server.core.domain.JobArtifact;
import io.mantisrx.server.core.domain.JobMetadata;
import io.mantisrx.server.core.domain.WorkerId;
import io.mantisrx.server.core.scheduler.SchedulingConstraints;
import io.mantisrx.server.master.agentdeploy.MigrationStrategyFactory;
import io.mantisrx.server.master.config.ConfigurationProvider;
import io.mantisrx.server.master.config.MasterConfiguration;
import io.mantisrx.server.master.domain.DataFormatAdapter;
import io.mantisrx.server.master.domain.IJobClusterDefinition;
import io.mantisrx.server.master.domain.JobDefinition;
import io.mantisrx.server.master.domain.JobId;
import io.mantisrx.server.master.persistence.MantisJobStore;
import io.mantisrx.server.master.persistence.exceptions.InvalidJobException;
import io.mantisrx.server.master.persistence.exceptions.InvalidWorkerStateChangeException;
import io.mantisrx.server.master.scheduler.BatchScheduleRequest;
import io.mantisrx.server.master.scheduler.MantisScheduler;
import io.mantisrx.server.master.scheduler.ScheduleRequest;
import io.mantisrx.server.master.scheduler.WorkerEvent;
import io.mantisrx.server.master.scheduler.WorkerOnDisabledVM;
import io.mantisrx.server.master.scheduler.WorkerUnscheduleable;
import io.mantisrx.shaded.com.fasterxml.jackson.databind.ObjectMapper;
import io.mantisrx.shaded.com.google.common.base.Preconditions;
import io.mantisrx.shaded.com.google.common.cache.Cache;
import io.mantisrx.shaded.com.google.common.cache.CacheBuilder;
import io.mantisrx.shaded.com.google.common.collect.Lists;
import java.io.IOException;
import java.net.URL;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.schedulers.Schedulers;
import rx.subjects.BehaviorSubject;

/* loaded from: input_file:io/mantisrx/master/jobcluster/job/JobActor.class */
public class JobActor extends AbstractActorWithTimers implements IMantisJobManager {
    private static final String CHECK_HB_TIMER_KEY = "CHECK_HB";
    private static final String REFRESH_SEND_STAGE_ASSIGNEMNTS_KEY = "REFRESH_SEND_STAGE_ASSIGNMENTS";
    private static final Logger LOGGER = LoggerFactory.getLogger(JobActor.class);
    private static final double DEFAULT_JOB_MASTER_CORES = 1.0d;
    private static final double DEFAULT_JOB_MASTER_MEM = 1024.0d;
    private static final double DEFAULT_JOB_MASTER_NW = 128.0d;
    private static final double DEFAULT_JOB_MASTER_DISK = 1024.0d;
    private final Metrics metrics;
    private final MetricGroupId metricsGroupId;
    private final Counter numWorkerResubmissions;
    private final Counter numWorkerResubmitLimitReached;
    private final Counter numScaleStage;
    private final Counter numWorkersCompletedNotTerminal;
    private final Counter numSchedulingChangesRefreshed;
    private final Counter numMissingWorkerPorts;
    private AbstractActor.Receive initializedBehavior;
    private AbstractActor.Receive activeBehavior;
    private AbstractActor.Receive terminatingBehavior;
    private AbstractActor.Receive terminatedBehavior;
    private final String clusterName;
    private final JobId jobId;
    private final IJobClusterDefinition jobClusterDefinition;
    private volatile MantisJobMetadataImpl mantisJobMetaData;
    private final MantisJobStore jobStore;
    private final MantisScheduler mantisScheduler;
    private final LifecycleEventPublisher eventPublisher;
    private final CostsCalculator costsCalculator;
    private final BehaviorSubject<JobScalerRuleInfo> scalerRuleInfoBehaviorSubject;
    private IWorkerManager workerManager = null;
    private volatile boolean allWorkersCompleted = false;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/mantisrx/master/jobcluster/job/JobActor$WorkerManager.class */
    public class WorkerManager implements IWorkerManager {
        private static final int WORKER_RESUBMIT_LIMIT = 100;
        private final WorkerNumberGenerator workerNumberGenerator;
        private final IMantisJobManager jobMgr;
        private int sinkStageNum;
        private final MigrationStrategy migrationStrategy;
        private final MantisScheduler scheduler;
        private BehaviorSubject<JobSchedulingInfo> jobSchedulingInfoBehaviorSubject;
        private String currentJobSchedulingInfoStr;
        private volatile boolean stageAssignmentPotentiallyChanged;
        private final boolean batchSchedulingEnabled;
        private final Counter numWorkerStuckInAccepted;
        private final Counter numWorkerMissingHeartbeat;
        private ObjectMapper mapper = new ObjectMapper();
        private boolean allWorkersStarted = false;
        private ConcurrentSkipListSet<Integer> workersToMigrate = new ConcurrentSkipListSet<>();
        private long lastWorkerMigrationTimestamp = Long.MIN_VALUE;
        private Map<Integer, WorkerAssignments> stageAssignments = new HashMap();
        private final WorkerResubmitRateLimiter resubmitRateLimiter = new WorkerResubmitRateLimiter();
        private Cache<Integer, Boolean> recentErrorWorkersCache = CacheBuilder.newBuilder().expireAfterWrite(1, TimeUnit.HOURS).build();

        WorkerManager(IMantisJobManager iMantisJobManager, WorkerMigrationConfig workerMigrationConfig, MantisScheduler mantisScheduler, boolean z, boolean z2, Metrics metrics) throws Exception {
            this.currentJobSchedulingInfoStr = null;
            this.numWorkerStuckInAccepted = metrics.getCounter("numWorkerStuckInAccepted");
            this.numWorkerMissingHeartbeat = metrics.getCounter("numWorkerMissingHeartbeat");
            this.workerNumberGenerator = new WorkerNumberGenerator(z ? 0 : iMantisJobManager.getJobDetails().getNextWorkerNumberToUse(), 10);
            this.scheduler = mantisScheduler;
            this.jobMgr = iMantisJobManager;
            this.batchSchedulingEnabled = z2;
            this.migrationStrategy = MigrationStrategyFactory.getStrategy(JobActor.this.jobId.getId(), workerMigrationConfig);
            int size = JobActor.this.mantisJobMetaData.getStageMetadata().size();
            if (size == 1) {
                this.sinkStageNum = 1;
            } else {
                this.sinkStageNum = size - 1;
            }
            JobSchedulingInfo jobSchedulingInfo = new JobSchedulingInfo(iMantisJobManager.getJobId().getId(), new HashMap());
            this.currentJobSchedulingInfoStr = this.mapper.writeValueAsString(jobSchedulingInfo);
            this.jobSchedulingInfoBehaviorSubject = BehaviorSubject.create(jobSchedulingInfo);
            initialize(z);
        }

        void initialize(boolean z) throws Exception {
            if (z) {
                submitInitialWorkers();
            } else {
                initializeRunningWorkers();
            }
            JobActor.this.mantisJobMetaData.setJobCosts(JobActor.this.costsCalculator.calculateCosts(JobActor.this.mantisJobMetaData));
        }

        private void initializeRunningWorkers() {
            List<JobWorker> markCorruptedWorkers = markCorruptedWorkers();
            ArrayList arrayList = new ArrayList();
            markStageAssignmentsChanged(true);
            for (IMantisStageMetadata iMantisStageMetadata : JobActor.this.mantisJobMetaData.getStageMetadata().values()) {
                HashMap hashMap = new HashMap();
                for (JobWorker jobWorker : iMantisStageMetadata.getAllWorkers()) {
                    IMantisWorkerMetadata metadata = jobWorker.getMetadata();
                    if (WorkerState.isRunningState(metadata.getState())) {
                        try {
                            jobWorker.processEvent(new WorkerHeartbeat(new Status(JobActor.this.jobId.getId(), iMantisStageMetadata.getStageNum(), metadata.getWorkerIndex(), metadata.getWorkerNumber(), Status.TYPE.HEARTBEAT, "", MantisJobState.Started, System.currentTimeMillis())), JobActor.this.jobStore);
                        } catch (InvalidWorkerStateChangeException | IOException e) {
                            JobActor.LOGGER.error("problem sending initial heartbeat for Job {} during initialization", jobWorker.getMetadata().getJobId(), e);
                        }
                        hashMap.put(Integer.valueOf(metadata.getWorkerNumber()), new WorkerHost(metadata.getSlave(), metadata.getWorkerIndex(), metadata.getWorkerPorts().getPorts(), DataFormatAdapter.convertWorkerStateToMantisJobState(metadata.getState()), metadata.getWorkerNumber(), metadata.getMetricsPort(), metadata.getCustomPort()));
                        this.scheduler.initializeRunningWorker(createSchedulingRequest(metadata, Optional.empty()), metadata.getSlave(), metadata.getSlaveID());
                    } else if (metadata.getState().equals(WorkerState.Accepted)) {
                        if (this.batchSchedulingEnabled && JobState.isAcceptedState(JobActor.this.mantisJobMetaData.getState())) {
                            arrayList.add(metadata);
                        } else {
                            queueTask(metadata);
                        }
                    }
                }
                if (iMantisStageMetadata.getStageNum() > 0) {
                    this.stageAssignments.put(Integer.valueOf(iMantisStageMetadata.getStageNum()), new WorkerAssignments(Integer.valueOf(iMantisStageMetadata.getStageNum()), Integer.valueOf(iMantisStageMetadata.getNumWorkers()), hashMap));
                }
            }
            if (JobState.isAcceptedState(JobActor.this.mantisJobMetaData.getState()) && !arrayList.isEmpty()) {
                queueTasks(arrayList, Optional.empty());
            }
            markStageAssignmentsChanged(true);
            for (JobWorker jobWorker2 : markCorruptedWorkers) {
                JobActor.LOGGER.warn("discovered workers with missing ports during initialization: {}", jobWorker2);
                try {
                    resubmitWorker(jobWorker2);
                } catch (Exception e2) {
                    JobActor.LOGGER.warn("Exception resubmitting worker {} during initializeRunningWorkers due to {}", new Object[]{jobWorker2, e2.getMessage(), e2});
                }
            }
        }

        private List<JobWorker> markCorruptedWorkers() {
            ArrayList arrayList = new ArrayList();
            for (IMantisStageMetadata iMantisStageMetadata : JobActor.this.mantisJobMetaData.getStageMetadata().values()) {
                for (JobWorker jobWorker : iMantisStageMetadata.getAllWorkers()) {
                    IMantisWorkerMetadata metadata = jobWorker.getMetadata();
                    Optional<WorkerPorts> ports = metadata.getPorts();
                    if (WorkerState.isRunningState(metadata.getState()) && !ports.isPresent()) {
                        JobActor.LOGGER.info("marking corrupted worker {} for Job ID {} as {}", new Object[]{jobWorker.getMetadata().getWorkerId(), JobActor.this.jobId, WorkerState.Failed});
                        JobActor.this.numMissingWorkerPorts.increment();
                        arrayList.add(jobWorker);
                        try {
                            jobWorker.processEvent(new WorkerStatus(new Status(JobActor.this.jobId.getId(), iMantisStageMetadata.getStageNum(), metadata.getWorkerIndex(), metadata.getWorkerNumber(), Status.TYPE.HEARTBEAT, "", MantisJobState.Failed, System.currentTimeMillis())), JobActor.this.jobStore);
                        } catch (InvalidWorkerStateChangeException | IOException e) {
                            JobActor.LOGGER.error("problem sending initial heartbeat for Job {} during initialization", jobWorker.getMetadata().getJobId(), e);
                        }
                    }
                }
            }
            return arrayList;
        }

        private void markStageAssignmentsChanged(boolean z) {
            this.stageAssignmentPotentiallyChanged = true;
            if (ConfigurationProvider.getConfig().getStageAssignmentRefreshIntervalMs() == -1 || z) {
                refreshStageAssignmentsAndPush();
            }
        }

        private void refreshStageAssignmentsAndPush() {
            if (this.stageAssignmentPotentiallyChanged) {
                ArrayList arrayList = new ArrayList();
                ArrayList arrayList2 = new ArrayList();
                for (IMantisStageMetadata iMantisStageMetadata : JobActor.this.mantisJobMetaData.getStageMetadata().values()) {
                    HashMap hashMap = new HashMap();
                    Iterator<JobWorker> it = iMantisStageMetadata.getAllWorkers().iterator();
                    while (it.hasNext()) {
                        IMantisWorkerMetadata metadata = it.next().getMetadata();
                        if (WorkerState.isRunningState(metadata.getState())) {
                            hashMap.put(Integer.valueOf(metadata.getWorkerNumber()), new WorkerHost(metadata.getSlave(), metadata.getWorkerIndex(), metadata.getWorkerPorts().getPorts(), DataFormatAdapter.convertWorkerStateToMantisJobState(metadata.getState()), metadata.getWorkerNumber(), metadata.getMetricsPort(), metadata.getCustomPort()));
                            arrayList2.add(metadata);
                            arrayList.add(metadata);
                        } else if (metadata.getState().equals(WorkerState.Accepted)) {
                            arrayList.add(metadata);
                        }
                    }
                    this.stageAssignments.put(Integer.valueOf(iMantisStageMetadata.getStageNum()), new WorkerAssignments(Integer.valueOf(iMantisStageMetadata.getStageNum()), Integer.valueOf(iMantisStageMetadata.getNumWorkers()), hashMap));
                }
                this.jobSchedulingInfoBehaviorSubject.onNext(new JobSchedulingInfo(JobActor.this.jobId.getId(), this.stageAssignments));
                JobActor.this.eventPublisher.publishWorkerListChangedEvent(new LifecycleEventsProto.WorkerListChangedEvent(new WorkerInfoListHolder(this.jobMgr.getJobId(), arrayList)));
                JobActor.this.numSchedulingChangesRefreshed.increment();
                this.stageAssignmentPotentiallyChanged = false;
            }
        }

        private void submitInitialWorkers() throws Exception {
            List<IMantisWorkerMetadata> initialWorkers = getInitialWorkers(JobActor.this.mantisJobMetaData.getJobDefinition(), System.currentTimeMillis());
            try {
                JobActor.this.jobStore.storeNewWorkers(this.jobMgr.getJobDetails(), initialWorkers);
                JobActor.LOGGER.info("Stored workers {} for Job {}", initialWorkers, JobActor.this.jobId);
                markStageAssignmentsChanged(true);
                if (!initialWorkers.isEmpty()) {
                    if (this.batchSchedulingEnabled) {
                        queueTasks(initialWorkers, Optional.empty());
                    } else {
                        initialWorkers.forEach(this::queueTask);
                    }
                }
            } catch (Exception e) {
                JobActor.LOGGER.error("Error {} storing workers of job {}", new Object[]{e.getMessage(), JobActor.this.jobId.getId(), e});
                throw new RuntimeException("Exception saving worker for Job " + JobActor.this.jobId, e);
            }
        }

        private void queueTasks(List<IMantisWorkerMetadata> list, Optional<Long> optional) {
            List list2 = (List) list.stream().map(iMantisWorkerMetadata -> {
                return createSchedulingRequest(iMantisWorkerMetadata, optional);
            }).collect(Collectors.toList());
            JobActor.LOGGER.info("Queueing up batch schedule request for {} workers", Integer.valueOf(list.size()));
            try {
                this.scheduler.scheduleWorkers(new BatchScheduleRequest(list2));
            } catch (Exception e) {
                JobActor.LOGGER.error("Exception queueing tasks", e);
            }
        }

        private void queueTask(IMantisWorkerMetadata iMantisWorkerMetadata) {
            queueTasks(Collections.singletonList(iMantisWorkerMetadata), Optional.empty());
        }

        private ScheduleRequest createSchedulingRequest(IMantisWorkerMetadata iMantisWorkerMetadata, Optional<Long> optional) {
            try {
                WorkerId workerId = iMantisWorkerMetadata.getWorkerId();
                Optional<IMantisStageMetadata> stageMetadata = JobActor.this.mantisJobMetaData.getStageMetadata(iMantisWorkerMetadata.getStageNum());
                if (!stageMetadata.isPresent()) {
                    throw new RuntimeException(String.format("No such stage %d", Integer.valueOf(iMantisWorkerMetadata.getStageNum())));
                }
                IMantisStageMetadata iMantisStageMetadata = stageMetadata.get();
                List<JobConstraints> hardConstraints = iMantisStageMetadata.getHardConstraints();
                List<JobConstraints> softConstraints = iMantisStageMetadata.getSoftConstraints();
                HashSet hashSet = new HashSet();
                if ((hardConstraints != null && !hardConstraints.isEmpty()) || (softConstraints != null && !softConstraints.isEmpty())) {
                    Iterator<JobWorker> it = iMantisStageMetadata.getAllWorkers().iterator();
                    while (it.hasNext()) {
                        if (it.next().getMetadata().getWorkerNumber() != workerId.getWorkerNum()) {
                            hashSet.add(workerId.getId());
                        }
                    }
                }
                JobMetadata jobMetadata = new JobMetadata(JobActor.this.mantisJobMetaData.getJobId().getId(), JobActor.this.mantisJobMetaData.getJobJarUrl(), JobActor.this.mantisJobMetaData.getJobDefinition().getVersion(), JobActor.this.mantisJobMetaData.getTotalStages(), JobActor.this.mantisJobMetaData.getUser(), JobActor.this.mantisJobMetaData.getSchedulingInfo(), JobActor.this.mantisJobMetaData.getParameters(), JobActor.getSubscriptionTimeoutSecs(JobActor.this.mantisJobMetaData), JobActor.getHeartbeatIntervalSecs(JobActor.this.mantisJobMetaData), JobActor.this.mantisJobMetaData.getMinRuntimeSecs());
                return new ScheduleRequest(workerId, iMantisWorkerMetadata.getStageNum(), jobMetadata, JobActor.this.mantisJobMetaData.getSla().orElse(new JobSla.Builder().build()).getDurationType(), SchedulingConstraints.of(iMantisStageMetadata.getMachineDefinition(), iMantisStageMetadata.getSizeAttribute(), mergeJobDefAndArtifactAssigmentAttributes(jobMetadata.getJobJarUrl())), optional.orElse(0L).longValue());
            } catch (Exception e) {
                JobActor.LOGGER.error("Exception creating scheduleRequest ", e);
                throw e;
            }
        }

        private Map<String, String> mergeJobDefAndArtifactAssigmentAttributes(URL url) {
            JobArtifact jobArtifact;
            try {
                Optional<String> extractArtifactBaseName = DataFormatAdapter.extractArtifactBaseName(url);
                if (extractArtifactBaseName.isPresent() && (jobArtifact = JobActor.this.jobStore.getJobArtifact(ArtifactID.of(extractArtifactBaseName.get()))) != null && jobArtifact.getTags() != null) {
                    HashMap hashMap = new HashMap(jobArtifact.getTags());
                    hashMap.putAll(JobActor.this.mantisJobMetaData.getJobDefinition().getSchedulingConstraints());
                    return hashMap;
                }
            } catch (Exception e) {
                JobActor.LOGGER.warn("Couldn't find job artifact by id: {}", url, e);
            }
            return JobActor.this.mantisJobMetaData.getJobDefinition().getSchedulingConstraints();
        }

        private List<IMantisWorkerMetadata> getInitialWorkers(JobDefinition jobDefinition, long j) throws Exception {
            LinkedList newLinkedList = Lists.newLinkedList();
            SchedulingInfo schedulingInfo = jobDefinition.getSchedulingInfo();
            int size = schedulingInfo.getStages().size();
            Iterator it = schedulingInfo.getStages().keySet().iterator();
            while (it.hasNext()) {
                newLinkedList.addAll(setupStageWorkers(schedulingInfo, size, ((Integer) it.next()).intValue(), j));
            }
            return newLinkedList;
        }

        private List<IMantisWorkerMetadata> setupStageWorkers(SchedulingInfo schedulingInfo, int i, int i2, long j) throws Exception {
            LinkedList linkedList = new LinkedList();
            StageSchedulingInfo stageSchedulingInfo = (StageSchedulingInfo) schedulingInfo.getStages().get(Integer.valueOf(i2));
            if (stageSchedulingInfo == null) {
                JobActor.LOGGER.error("StageSchedulingInfo cannot be null for Stage {}", Integer.valueOf(i2));
                throw new Exception("StageSchedulingInfo cannot be null for Stage " + i2);
            }
            int numberOfInstances = stageSchedulingInfo.getNumberOfInstances();
            int i3 = 0;
            for (int i4 = 0; i4 < numberOfInstances; i4++) {
                int i5 = i3;
                i3++;
                if (!JobActor.this.mantisJobMetaData.getStageMetadata(i2).isPresent()) {
                    IMantisStageMetadata build = new MantisStageMetadataImpl.Builder().withJobId(JobActor.this.jobId).withStageNum(i2).withNumStages(i).withMachineDefinition(stageSchedulingInfo.getMachineDefinition()).withNumWorkers(numberOfInstances).withHardConstraints(stageSchedulingInfo.getHardConstraints()).withSoftConstraints(stageSchedulingInfo.getSoftConstraints()).withScalingPolicy(stageSchedulingInfo.getScalingPolicy()).withSizeAttribute((String) Optional.ofNullable(stageSchedulingInfo.getContainerAttributes()).map(map -> {
                        return (String) map.get(StringConstants.MANTIS_STAGE_CONTAINER_SIZE_NAME_KEY);
                    }).orElse(null)).isScalable(stageSchedulingInfo.getScalable()).build();
                    JobActor.this.mantisJobMetaData.addJobStageIfAbsent(build);
                    JobActor.this.jobStore.updateStage(build);
                }
                linkedList.add(addWorker(schedulingInfo, i2, i5));
            }
            return linkedList;
        }

        private IMantisWorkerMetadata addWorker(SchedulingInfo schedulingInfo, int i, int i2) throws InvalidJobException {
            StageSchedulingInfo stageSchedulingInfo = (StageSchedulingInfo) schedulingInfo.getStages().get(Integer.valueOf(i));
            int nextWorkerNumber = this.workerNumberGenerator.getNextWorkerNumber(JobActor.this.mantisJobMetaData, JobActor.this.jobStore);
            JobWorker build = new JobWorker.Builder().withJobId(JobActor.this.jobId).withWorkerIndex(i2).withWorkerNumber(nextWorkerNumber).withNumberOfPorts(stageSchedulingInfo.getMachineDefinition().getNumPorts() + 4).withStageNum(i).withLifecycleEventsPublisher(JobActor.this.eventPublisher).build();
            if (JobActor.this.mantisJobMetaData.addWorkerMetadata(i, build)) {
                JobActor.this.mantisJobMetaData.setJobCosts(JobActor.this.costsCalculator.calculateCosts(JobActor.this.mantisJobMetaData));
                return build.getMetadata();
            }
            Optional<JobWorker> workerByIndex = JobActor.this.mantisJobMetaData.getWorkerByIndex(i, i2);
            if (workerByIndex.isPresent()) {
                throw new InvalidJobException(JobActor.this.mantisJobMetaData.getJobId().getId(), i, i2, new Exception("Couldn't add worker " + nextWorkerNumber + " as index " + i2 + ", that index already has worker " + workerByIndex.get().getMetadata().getWorkerNumber()));
            }
            throw new InvalidJobException(JobActor.this.mantisJobMetaData.getJobId().getId(), i, i2, new Exception("Couldn't add worker " + nextWorkerNumber + " as index " + i2 + "doesn't exist "));
        }

        @Override // io.mantisrx.master.jobcluster.job.IWorkerManager
        public void shutdown() {
            this.scheduler.unscheduleJob(JobActor.this.jobId.getId());
            if (!allWorkerCompleted()) {
                terminateAllWorkersAsync();
            }
            this.jobSchedulingInfoBehaviorSubject.onNext(new JobSchedulingInfo(this.jobMgr.getJobId().getId(), new HashMap()));
            this.jobSchedulingInfoBehaviorSubject.onCompleted();
        }

        private void terminateAllWorkersAsync() {
            JobActor.LOGGER.info("Terminating all workers of job {}", JobActor.this.jobId);
            Observable.from(JobActor.this.mantisJobMetaData.getStageMetadata().values()).flatMap(iMantisStageMetadata -> {
                return Observable.from(iMantisStageMetadata.getAllWorkers());
            }).filter(jobWorker -> {
                return Boolean.valueOf(!WorkerState.isTerminalState(jobWorker.getMetadata().getState()));
            }).map(jobWorker2 -> {
                JobActor.LOGGER.info("Terminating " + jobWorker2);
                terminateWorker(jobWorker2.getMetadata(), WorkerState.Completed, JobCompletedReason.Killed);
                return jobWorker2;
            }).doOnCompleted(() -> {
                markStageAssignmentsChanged(true);
            }).subscribeOn(Schedulers.io()).subscribe();
            JobActor.LOGGER.info("Terminated all workers of job {}", JobActor.this.jobId);
        }

        private void terminateWorker(IMantisWorkerMetadata iMantisWorkerMetadata, WorkerState workerState, JobCompletedReason jobCompletedReason) {
            JobActor.LOGGER.info("Terminating  worker {} with number {}", iMantisWorkerMetadata, Integer.valueOf(iMantisWorkerMetadata.getWorkerNumber()));
            try {
                WorkerId workerId = iMantisWorkerMetadata.getWorkerId();
                this.scheduler.unscheduleAndTerminateWorker(iMantisWorkerMetadata.getWorkerId(), Optional.ofNullable(iMantisWorkerMetadata.getSlave()));
                int intValue = JobActor.this.mantisJobMetaData.getWorkerNumberToStageMap().get(Integer.valueOf(iMantisWorkerMetadata.getWorkerNumber())).intValue();
                Optional<IMantisStageMetadata> stageMetadata = JobActor.this.mantisJobMetaData.getStageMetadata(intValue);
                if (stageMetadata.isPresent()) {
                    Optional<JobWorker> processWorkerEvent = ((MantisStageMetadataImpl) stageMetadata.get()).processWorkerEvent(new WorkerTerminate(workerId, workerState, jobCompletedReason), JobActor.this.jobStore);
                    if (processWorkerEvent.isPresent()) {
                        JobActor.this.jobStore.archiveWorker(processWorkerEvent.get().getMetadata());
                        JobActor.this.eventPublisher.publishStatusEvent(new LifecycleEventsProto.WorkerStatusEvent(LifecycleEventsProto.StatusEvent.StatusEventType.INFO, "Terminated worker, reason: " + jobCompletedReason.name(), iMantisWorkerMetadata.getStageNum(), iMantisWorkerMetadata.getWorkerId(), iMantisWorkerMetadata.getState()));
                    }
                } else {
                    JobActor.LOGGER.error("Stage {} not found while terminating worker {}", Integer.valueOf(intValue), workerId);
                }
            } catch (Exception e) {
                JobActor.LOGGER.error("Error terminating worker {}", iMantisWorkerMetadata.getWorkerId(), e);
            }
        }

        private void terminateAndRemoveWorker(IMantisWorkerMetadata iMantisWorkerMetadata, WorkerState workerState, JobCompletedReason jobCompletedReason) {
            JobActor.LOGGER.info("Terminating and removing worker {}", iMantisWorkerMetadata.getWorkerId().getId());
            try {
                WorkerId workerId = iMantisWorkerMetadata.getWorkerId();
                int intValue = JobActor.this.mantisJobMetaData.getWorkerNumberToStageMap().get(Integer.valueOf(iMantisWorkerMetadata.getWorkerNumber())).intValue();
                Optional<IMantisStageMetadata> stageMetadata = JobActor.this.mantisJobMetaData.getStageMetadata(intValue);
                if (stageMetadata.isPresent()) {
                    WorkerTerminate workerTerminate = new WorkerTerminate(workerId, workerState, jobCompletedReason);
                    MantisStageMetadataImpl mantisStageMetadataImpl = (MantisStageMetadataImpl) stageMetadata.get();
                    mantisStageMetadataImpl.processWorkerEvent(workerTerminate, JobActor.this.jobStore);
                    JobActor.this.eventPublisher.publishStatusEvent(new LifecycleEventsProto.WorkerStatusEvent(LifecycleEventsProto.StatusEvent.StatusEventType.INFO, "Removing worker, reason: " + jobCompletedReason.name(), iMantisWorkerMetadata.getStageNum(), iMantisWorkerMetadata.getWorkerId(), iMantisWorkerMetadata.getState()));
                    mantisStageMetadataImpl.unsafeRemoveWorker(workerId.getWorkerIndex(), workerId.getWorkerNum(), JobActor.this.jobStore);
                    this.scheduler.unscheduleAndTerminateWorker(iMantisWorkerMetadata.getWorkerId(), Optional.ofNullable(iMantisWorkerMetadata.getSlave()));
                    JobActor.this.mantisJobMetaData.removeWorkerMetadata(iMantisWorkerMetadata.getWorkerNumber());
                    JobActor.this.mantisJobMetaData.setJobCosts(JobActor.this.costsCalculator.calculateCosts(JobActor.this.mantisJobMetaData));
                    JobActor.LOGGER.info("Terminated worker {}", iMantisWorkerMetadata);
                    markStageAssignmentsChanged(true);
                } else {
                    JobActor.LOGGER.error("Stage {} not found while terminating worker {}", Integer.valueOf(intValue), workerId);
                }
            } catch (Exception e) {
                JobActor.LOGGER.error("Error terminating worker {}", iMantisWorkerMetadata.getWorkerId(), e);
            }
        }

        @Override // io.mantisrx.master.jobcluster.job.IWorkerManager
        public void refreshAndSendWorkerAssignments() {
            refreshStageAssignmentsAndPush();
        }

        @Override // io.mantisrx.master.jobcluster.job.IWorkerManager
        public void checkHeartBeats(Instant instant) {
            JobActor.LOGGER.debug("Using worker timeout {} for job {}", Long.valueOf(JobActor.this.getWorkerTimeoutSecs()), this.jobMgr.getJobId());
            long workerTimeoutSecs = (long) (1.5d * JobActor.this.getWorkerTimeoutSecs());
            long workerInitTimeoutSecs = workerTimeoutSecs + ConfigurationProvider.getConfig().getWorkerInitTimeoutSecs();
            ArrayList<JobWorker> newArrayList = Lists.newArrayList();
            this.resubmitRateLimiter.expireResubmitRecords(instant.toEpochMilli());
            Iterator<? extends IMantisStageMetadata> it = JobActor.this.mantisJobMetaData.getStageMetadata().values().iterator();
            while (it.hasNext()) {
                for (JobWorker jobWorker : it.next().getAllWorkers()) {
                    IMantisWorkerMetadata metadata = jobWorker.getMetadata();
                    if (metadata.hasLaunched()) {
                        boolean z = !metadata.getLastHeartbeatAt().isPresent() && Duration.between(Instant.ofEpochSecond(metadata.getLaunchedAt()), instant).getSeconds() > workerTimeoutSecs;
                        boolean z2 = metadata.getLastHeartbeatAt().isPresent() && Duration.between(metadata.getLastHeartbeatAt().get(), instant).getSeconds() > workerTimeoutSecs;
                        if (z || z2) {
                            this.numWorkerMissingHeartbeat.increment();
                            if (metadata.getLastHeartbeatAt().isPresent()) {
                                JobActor.LOGGER.warn("Job {}, Worker {} Duration between last heartbeat and now {} missed heart beat threshold {} exceeded", new Object[]{this.jobMgr.getJobId(), metadata.getWorkerId(), Long.valueOf(Duration.between(metadata.getLastHeartbeatAt().get(), instant).getSeconds()), Long.valueOf(workerTimeoutSecs)});
                            } else {
                                JobActor.LOGGER.warn("Job {}, Worker {} hasn't received heartbeat, threshold {} exceeded", new Object[]{this.jobMgr.getJobId(), metadata.getWorkerId(), Long.valueOf(workerTimeoutSecs)});
                            }
                            if (ConfigurationProvider.getConfig().isHeartbeatTerminationEnabled()) {
                                JobActor.this.eventPublisher.publishStatusEvent(new LifecycleEventsProto.WorkerStatusEvent(LifecycleEventsProto.StatusEvent.StatusEventType.WARN, "heartbeat too old, resubmitting worker", metadata.getStageNum(), metadata.getWorkerId(), metadata.getState()));
                                newArrayList.add(jobWorker);
                            } else {
                                JobActor.LOGGER.warn("Heart beat based termination is disabled. Skipping termination of worker {} Please see mantis.worker.heartbeat.termination.enabled", metadata);
                            }
                        }
                    } else {
                        Instant ofEpochMilli = Instant.ofEpochMilli(metadata.getAcceptedAt());
                        this.numWorkerStuckInAccepted.increment();
                        if (this.scheduler.schedulerHandlesAllocationRetries()) {
                            JobActor.LOGGER.warn("Job {}, Worker {} stuck in accepted state since {}, pending scheduler retry", new Object[]{this.jobMgr.getJobId(), metadata.getWorkerId(), ofEpochMilli});
                        } else if (Duration.between(ofEpochMilli, instant).getSeconds() > workerInitTimeoutSecs) {
                            JobActor.LOGGER.info("Resubmitting Job {}, Worker {} that has been stuck in accepted state for {}", new Object[]{this.jobMgr.getJobId(), metadata.getWorkerId(), Long.valueOf(Duration.between(ofEpochMilli, instant).getSeconds())});
                            newArrayList.add(jobWorker);
                            JobActor.this.eventPublisher.publishStatusEvent(new LifecycleEventsProto.WorkerStatusEvent(LifecycleEventsProto.StatusEvent.StatusEventType.WARN, "worker stuck in Accepted state, resubmitting worker", metadata.getStageNum(), metadata.getWorkerId(), metadata.getState()));
                        }
                    }
                }
            }
            for (JobWorker jobWorker2 : newArrayList) {
                try {
                    resubmitWorker(jobWorker2);
                } catch (Exception e) {
                    JobActor.LOGGER.warn("Exception {} occurred resubmitting Worker {}", new Object[]{e.getMessage(), jobWorker2.getMetadata(), e});
                }
            }
            migrateDisabledVmWorkers(instant);
        }

        @Override // io.mantisrx.master.jobcluster.job.IWorkerManager
        public void migrateDisabledVmWorkers(Instant instant) {
            if (this.workersToMigrate.isEmpty()) {
                return;
            }
            Map<Integer, Integer> workerNumberToStageMap = JobActor.this.mantisJobMetaData.getWorkerNumberToStageMap();
            List execute = this.migrationStrategy.execute(this.workersToMigrate, getNumberOfWorkersInStartedState(), getTotalWorkerCount(), this.lastWorkerMigrationTimestamp);
            if (!execute.isEmpty()) {
                JobActor.LOGGER.info("Job {} Going to migrate {} workers in this iteration", JobActor.this.jobId, Integer.valueOf(execute.size()));
            }
            execute.forEach(num -> {
                if (!workerNumberToStageMap.containsKey(num)) {
                    JobActor.LOGGER.warn("worker {} not found in workerToStageMap {} for Job {}", new Object[]{num, workerNumberToStageMap, JobActor.this.jobId});
                    return;
                }
                int intValue = ((Integer) workerNumberToStageMap.get(num)).intValue();
                Optional<IMantisStageMetadata> stageMetadata = JobActor.this.mantisJobMetaData.getStageMetadata(intValue);
                if (!stageMetadata.isPresent()) {
                    JobActor.LOGGER.warn("Stage {} Not Found. Skip move for worker {} in Job {}", new Object[]{Integer.valueOf(intValue), num, JobActor.this.jobId});
                    return;
                }
                JobWorker jobWorker = null;
                try {
                    jobWorker = stageMetadata.get().getWorkerByWorkerNumber(num.intValue());
                    IMantisWorkerMetadata metadata = jobWorker.getMetadata();
                    JobActor.LOGGER.info("Moving worker {} of job {} away from disabled VM", metadata.getWorkerId(), JobActor.this.jobId);
                    JobActor.this.eventPublisher.publishStatusEvent(new LifecycleEventsProto.WorkerStatusEvent(LifecycleEventsProto.StatusEvent.StatusEventType.INFO, " Moving out of disabled VM " + metadata.getSlave(), metadata.getStageNum(), metadata.getWorkerId(), metadata.getState()));
                    resubmitWorker(jobWorker);
                    this.lastWorkerMigrationTimestamp = System.currentTimeMillis();
                } catch (Exception e) {
                    JobActor.LOGGER.warn("Exception resubmitting worker {} during migration due to {}", new Object[]{jobWorker, e.getMessage(), e});
                }
            });
        }

        private Optional<IMantisStageMetadata> getStageForWorker(WorkerEvent workerEvent) {
            Map<Integer, Integer> workerNumberToStageMap = JobActor.this.mantisJobMetaData.getWorkerNumberToStageMap();
            if (!workerNumberToStageMap.containsKey(Integer.valueOf(workerEvent.getWorkerId().getWorkerNum()))) {
                JobActor.LOGGER.info("Event {} from Unknown worker {} ", workerEvent.getWorkerId(), workerEvent);
                return Optional.empty();
            }
            Integer num = workerNumberToStageMap.get(Integer.valueOf(workerEvent.getWorkerId().getWorkerNum()));
            Optional<IMantisStageMetadata> stageMetadata = JobActor.this.mantisJobMetaData.getStageMetadata(num.intValue());
            if (!stageMetadata.isPresent()) {
                JobActor.LOGGER.warn("Stage {} not found in Job {} while processing event {}", new Object[]{num, JobActor.this.jobId, workerEvent});
            }
            return stageMetadata;
        }

        private void terminateUnknownWorkerIfNonTerminal(WorkerEvent workerEvent) {
            if (JobHelper.isTerminalWorkerEvent(workerEvent)) {
                JobActor.LOGGER.info("Job {} Terminal event from Unknown worker {}. Ignoring", JobActor.this.jobId, workerEvent.getWorkerId());
                return;
            }
            JobActor.LOGGER.warn("Non terminal event from Unknown worker {} in Job {}. Request Termination", workerEvent.getWorkerId(), this.jobMgr.getJobId());
            this.scheduler.unscheduleAndTerminateWorker(workerEvent.getWorkerId(), JobHelper.getWorkerHostFromWorkerEvent(workerEvent));
        }

        @Override // io.mantisrx.master.jobcluster.job.IWorkerManager
        public void processEvent(WorkerEvent workerEvent, JobState jobState) {
            Optional<JobWorker> processWorkerEvent;
            try {
                Optional<IMantisStageMetadata> stageForWorker = getStageForWorker(workerEvent);
                if (!stageForWorker.isPresent()) {
                    terminateUnknownWorkerIfNonTerminal(workerEvent);
                    return;
                }
                if (workerEvent instanceof WorkerUnscheduleable) {
                    this.scheduler.updateWorkerSchedulingReadyTime(workerEvent.getWorkerId(), this.resubmitRateLimiter.getWorkerResubmitTime(workerEvent.getWorkerId(), stageForWorker.get().getStageNum()));
                    JobActor.this.eventPublisher.publishStatusEvent(new LifecycleEventsProto.WorkerStatusEvent(LifecycleEventsProto.StatusEvent.StatusEventType.ERROR, "rate limiting: no resources to fit worker", ((WorkerUnscheduleable) workerEvent).getStageNum(), workerEvent.getWorkerId(), WorkerState.Accepted));
                    return;
                }
                MantisStageMetadataImpl mantisStageMetadataImpl = (MantisStageMetadataImpl) stageForWorker.get();
                try {
                    if (workerEvent instanceof WorkerHeartbeat) {
                        int workerIndex = workerEvent.getWorkerId().getWorkerIndex();
                        int workerNum = workerEvent.getWorkerId().getWorkerNum();
                        int workerNumber = mantisStageMetadataImpl.getWorkerByIndex(workerIndex).getMetadata().getWorkerNumber();
                        if (workerNumber > workerNum) {
                            JobActor.LOGGER.error("[Corrupted state] StaleWorkerEvent: {}, current worker at {}, Terminate stale worker", workerEvent.getWorkerId(), Integer.valueOf(workerNumber));
                        } else if (workerNumber < workerNum) {
                            JobActor.LOGGER.error("[Corrupted state] Newer worker num received: {}, Current stage worker: {}", workerEvent, Integer.valueOf(workerNumber));
                        }
                    }
                } catch (InvalidJobException e) {
                    JobActor.LOGGER.error("Invalid job error when checking event: {}", workerEvent, e);
                }
                try {
                    processWorkerEvent = mantisStageMetadataImpl.processWorkerEvent(workerEvent, JobActor.this.jobStore);
                } catch (Exception e2) {
                    JobActor.LOGGER.warn("Exception saving worker update", e2);
                }
                if (!processWorkerEvent.isPresent()) {
                    terminateUnknownWorkerIfNonTerminal(workerEvent);
                    return;
                }
                IMantisWorkerMetadata metadata = processWorkerEvent.get().getMetadata();
                if (workerEvent instanceof WorkerOnDisabledVM) {
                    this.workersToMigrate.add(Integer.valueOf(metadata.getWorkerNumber()));
                    return;
                }
                if (WorkerState.isErrorState(metadata.getState()) && !JobState.isTerminalState(jobState)) {
                    JobActor.this.eventPublisher.publishStatusEvent(new LifecycleEventsProto.WorkerStatusEvent(LifecycleEventsProto.StatusEvent.StatusEventType.WARN, "resubmitting lost worker ", metadata.getStageNum(), metadata.getWorkerId(), metadata.getState()));
                    this.recentErrorWorkersCache.put(Integer.valueOf(metadata.getWorkerNumber()), true);
                    resubmitWorker(processWorkerEvent.get());
                    return;
                }
                if (WorkerState.isTerminalState(metadata.getState())) {
                    JobActor.this.jobStore.archiveWorker(metadata);
                    JobActor.LOGGER.info("Received Worker Complete signal. Wait for all workers to complete before terminating Job {}", JobActor.this.jobId);
                }
                if (!(workerEvent instanceof WorkerHeartbeat)) {
                    markStageAssignmentsChanged(false);
                }
                if (this.allWorkersStarted || JobState.isTerminalState(jobState)) {
                    if (allWorkerCompleted()) {
                        JobActor.LOGGER.info("Job {} All workers completed", JobActor.this.jobId);
                        this.allWorkersStarted = false;
                        this.jobMgr.onAllWorkersCompleted();
                    }
                } else if (allWorkerStarted()) {
                    this.allWorkersStarted = true;
                    this.jobMgr.onAllWorkersStarted();
                    this.scheduler.unscheduleJob(JobActor.this.jobId.getId());
                    markStageAssignmentsChanged(true);
                } else if (allWorkerCompleted()) {
                    JobActor.LOGGER.info("Job {} All workers completed1", JobActor.this.jobId);
                    this.allWorkersStarted = false;
                    this.jobMgr.onAllWorkersCompleted();
                }
            } catch (Exception e3) {
                JobActor.LOGGER.error("Job {} Exception occurred in process worker event ", JobActor.this.jobId, e3);
            }
        }

        private boolean allWorkerStarted() {
            Iterator<? extends IMantisStageMetadata> it = JobActor.this.mantisJobMetaData.getStageMetadata().values().iterator();
            while (it.hasNext()) {
                if (!((MantisStageMetadataImpl) it.next()).isAllWorkerStarted()) {
                    return false;
                }
            }
            return true;
        }

        private int getNumberOfWorkersInStartedState() {
            return ((Integer) JobActor.this.mantisJobMetaData.getStageMetadata().values().stream().map(iMantisStageMetadata -> {
                return Integer.valueOf(((MantisStageMetadataImpl) iMantisStageMetadata).getNumStartedWorkers());
            }).reduce(0, (num, num2) -> {
                return Integer.valueOf(num.intValue() + num2.intValue());
            })).intValue();
        }

        private int getTotalWorkerCount() {
            return ((Integer) JobActor.this.mantisJobMetaData.getStageMetadata().values().stream().map((v0) -> {
                return v0.getNumWorkers();
            }).reduce(0, (num, num2) -> {
                return Integer.valueOf(num.intValue() + num2.intValue());
            })).intValue();
        }

        private boolean allWorkerCompleted() {
            Iterator<? extends IMantisStageMetadata> it = JobActor.this.mantisJobMetaData.getStageMetadata().values().iterator();
            while (it.hasNext()) {
                MantisStageMetadataImpl mantisStageMetadataImpl = (MantisStageMetadataImpl) it.next();
                if (mantisStageMetadataImpl.getStageNum() != 0 && !mantisStageMetadataImpl.isAllWorkerCompleted()) {
                    return false;
                }
            }
            return true;
        }

        @Override // io.mantisrx.master.jobcluster.job.IWorkerManager
        public void resubmitWorker(int i) throws Exception {
            Map<Integer, Integer> workerNumberToStageMap = JobActor.this.mantisJobMetaData.getWorkerNumberToStageMap();
            if (!workerNumberToStageMap.containsKey(Integer.valueOf(i))) {
                JobActor.LOGGER.warn("No such Worker number {} in Job with ID {}", Integer.valueOf(i), JobActor.this.jobId);
                throw new Exception(String.format("No such worker number %d in resubmit Worker request", Integer.valueOf(i)));
            }
            int intValue = workerNumberToStageMap.get(Integer.valueOf(i)).intValue();
            Optional<IMantisStageMetadata> stageMetadata = JobActor.this.mantisJobMetaData.getStageMetadata(intValue);
            if (!stageMetadata.isPresent()) {
                throw new Exception(String.format("Invalid stage %d in resubmit Worker request %d", Integer.valueOf(intValue), Integer.valueOf(i)));
            }
            resubmitWorker(stageMetadata.get().getWorkerByWorkerNumber(i));
        }

        @Override // io.mantisrx.master.jobcluster.job.IWorkerManager
        public List<IMantisWorkerMetadata> getActiveWorkers(int i) {
            List<IMantisWorkerMetadata> list = (List) JobActor.this.mantisJobMetaData.getStageMetadata().values().stream().flatMap(iMantisStageMetadata -> {
                return iMantisStageMetadata.getAllWorkers().stream();
            }).filter(jobWorker -> {
                return !WorkerState.isTerminalState(jobWorker.getMetadata().getState());
            }).map((v0) -> {
                return v0.getMetadata();
            }).collect(Collectors.toList());
            return list.size() > i ? list.subList(0, i) : list;
        }

        @Override // io.mantisrx.master.jobcluster.job.IWorkerManager
        public BehaviorSubject<JobSchedulingInfo> getJobStatusSubject() {
            return this.jobSchedulingInfoBehaviorSubject;
        }

        private void resubmitWorker(JobWorker jobWorker) throws Exception {
            JobActor.LOGGER.info("Resubmitting worker {}", jobWorker.getMetadata());
            Map<Integer, Integer> workerNumberToStageMap = JobActor.this.mantisJobMetaData.getWorkerNumberToStageMap();
            IMantisWorkerMetadata metadata = jobWorker.getMetadata();
            if (this.recentErrorWorkersCache.size() >= ConfigurationProvider.getConfig().getMaximumResubmissionsPerWorker()) {
                JobActor.LOGGER.error("Resubmit count exceeded");
                this.jobMgr.onTooManyWorkerResubmits();
                return;
            }
            Integer num = workerNumberToStageMap.get(Integer.valueOf(metadata.getWorkerId().getWorkerNum()));
            if (num == null) {
                String format = String.format("Stage %d not found in Job %s while resubmiting worker %s", num, JobActor.this.jobId, jobWorker);
                JobActor.LOGGER.warn(format);
                throw new Exception(format);
            }
            Optional<IMantisStageMetadata> stageMetadata = JobActor.this.mantisJobMetaData.getStageMetadata(num.intValue());
            if (!stageMetadata.isPresent()) {
                String format2 = String.format("Stage %d not found in Job %s while resubmiting worker %s", num, JobActor.this.jobId, jobWorker);
                JobActor.LOGGER.warn(format2);
                throw new Exception(format2);
            }
            MantisStageMetadataImpl mantisStageMetadataImpl = (MantisStageMetadataImpl) stageMetadata.get();
            JobWorker build = new JobWorker.Builder().withJobId(JobActor.this.jobId).withWorkerIndex(metadata.getWorkerIndex()).withWorkerNumber(this.workerNumberGenerator.getNextWorkerNumber(JobActor.this.mantisJobMetaData, JobActor.this.jobStore)).withNumberOfPorts(mantisStageMetadataImpl.getMachineDefinition().getNumPorts() + 4).withStageNum(metadata.getStageNum()).withResubmitCount(metadata.getTotalResubmitCount() + 1).withResubmitOf(metadata.getWorkerNumber()).withLifecycleEventsPublisher(JobActor.this.eventPublisher).build();
            JobActor.this.mantisJobMetaData.replaceWorkerMetaData(metadata.getStageNum(), build, jobWorker, JobActor.this.jobStore);
            JobActor.this.mantisJobMetaData.setJobCosts(JobActor.this.costsCalculator.calculateCosts(JobActor.this.mantisJobMetaData));
            this.scheduler.unscheduleAndTerminateWorker(metadata.getWorkerId(), Optional.ofNullable(metadata.getSlave()));
            Optional<Long> of = Optional.of(Long.valueOf(this.resubmitRateLimiter.getWorkerResubmitTime(build.getMetadata().getWorkerId(), mantisStageMetadataImpl.getStageNum())));
            markStageAssignmentsChanged(true);
            queueTasks(Collections.singletonList(build.getMetadata()), of);
            JobActor.LOGGER.info("Worker {} successfully queued for scheduling", build);
            JobActor.this.numWorkerResubmissions.increment();
        }

        @Override // io.mantisrx.master.jobcluster.job.IWorkerManager
        public int scaleStage(MantisStageMetadataImpl mantisStageMetadataImpl, int i, int i2, int i3, String str) {
            JobActor.LOGGER.info("Scaling stage {} to {} workers", Integer.valueOf(mantisStageMetadataImpl.getStageNum()), Integer.valueOf(i3));
            int numWorkers = mantisStageMetadataImpl.getNumWorkers();
            int max = Math.max(ConfigurationProvider.getConfig().getMaxWorkersPerStage(), i);
            int i4 = i2;
            if (mantisStageMetadataImpl.getScalingPolicy() != null) {
                max = Math.max(max, mantisStageMetadataImpl.getScalingPolicy().getMax());
                i4 = Math.min(i4, mantisStageMetadataImpl.getScalingPolicy().getMin());
            }
            int max2 = Math.max(Math.min(i3, max), i4);
            if (max2 != numWorkers) {
                try {
                    mantisStageMetadataImpl.unsafeSetNumWorkers(max2, JobActor.this.jobStore);
                    JobActor.this.eventPublisher.publishStatusEvent(new LifecycleEventsProto.JobStatusEvent(LifecycleEventsProto.StatusEvent.StatusEventType.INFO, String.format("Setting #workers to %d for stage %d, reason=%s", Integer.valueOf(max2), Integer.valueOf(mantisStageMetadataImpl.getStageNum()), str), JobActor.this.getJobId(), JobActor.this.getJobState()));
                    if (max2 > numWorkers) {
                        for (int i5 = 0; i5 < max2 - numWorkers; i5++) {
                            try {
                                IMantisWorkerMetadata addWorker = addWorker(JobActor.this.mantisJobMetaData.getJobDefinition().getSchedulingInfo(), mantisStageMetadataImpl.getStageNum(), numWorkers + i5);
                                JobActor.this.jobStore.storeNewWorker(addWorker);
                                markStageAssignmentsChanged(true);
                                queueTask(addWorker);
                            } catch (Exception e) {
                                JobActor.LOGGER.warn("Exception adding new worker for {}", mantisStageMetadataImpl.getJobId().getId(), e);
                            }
                        }
                    } else {
                        for (int i6 = 0; i6 < numWorkers - max2; i6++) {
                            try {
                                terminateAndRemoveWorker(mantisStageMetadataImpl.getWorkerByIndex((numWorkers - i6) - 1).getMetadata(), WorkerState.Completed, JobCompletedReason.Killed);
                            } catch (InvalidJobException e2) {
                                JobActor.LOGGER.warn("Exception terminating worker for {}", mantisStageMetadataImpl.getJobId().getId(), e2);
                            }
                        }
                    }
                } catch (Exception e3) {
                    String format = String.format("Exception updating stage %d worker count for Job %s due to %s", Integer.valueOf(mantisStageMetadataImpl.getStageNum()), JobActor.this.jobId, e3.getMessage());
                    JobActor.LOGGER.warn(format);
                    JobActor.this.eventPublisher.publishStatusEvent(new LifecycleEventsProto.JobStatusEvent(LifecycleEventsProto.StatusEvent.StatusEventType.WARN, String.format("Scaling stage failed for stage %d reason: %s", Integer.valueOf(mantisStageMetadataImpl.getStageNum()), e3.getMessage()), JobActor.this.getJobId(), JobActor.this.getJobState()));
                    throw new RuntimeException(format);
                }
            }
            JobActor.LOGGER.info("{} Scaled stage to {} workers", mantisStageMetadataImpl.getJobId().getId(), Integer.valueOf(max2));
            return max2;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/mantisrx/master/jobcluster/job/JobActor$WorkerNumberGenerator.class */
    public static class WorkerNumberGenerator {
        private static final int MAX_ATTEMPTS = 10;
        private static final int DEFAULT_INCREMENT_STEP = 10;
        private final int incrementStep;
        private int lastUsed;
        private int currLimit;
        private volatile boolean hasErrored;
        private static final Logger LOGGER = LoggerFactory.getLogger(WorkerNumberGenerator.class);
        private static final long SLEEP_DURATION_MS = Duration.ofSeconds(2).toMillis();

        WorkerNumberGenerator(int i, int i2) {
            this.hasErrored = false;
            Preconditions.checkArgument(i >= 0, "Last Used worker Number cannot be negative {} ", i);
            Preconditions.checkArgument(i2 >= 1, "incrementStepcannot be less than 1 {} ", i2);
            this.lastUsed = i;
            this.currLimit = i;
            this.incrementStep = i2;
        }

        WorkerNumberGenerator() {
            this(0, 10);
        }

        private void advance(MantisJobMetadataImpl mantisJobMetadataImpl, MantisJobStore mantisJobStore) {
            try {
                int i = this.currLimit + this.incrementStep;
                setNextWorkerNumberWithRetries(mantisJobMetadataImpl, mantisJobStore, i);
                this.currLimit = i;
            } catch (Exception e) {
                this.hasErrored = true;
                LOGGER.error("Exception setting nextWorkerNumberToUse after {} consecutive attempts", 10, e);
                throw new RuntimeException("Unexpected error setting next worker number to use", e);
            }
        }

        private void setNextWorkerNumberWithRetries(MantisJobMetadataImpl mantisJobMetadataImpl, MantisJobStore mantisJobStore, int i) throws Exception {
            Exception exc = null;
            for (int i2 = 0; i2 < 10; i2++) {
                try {
                    mantisJobMetadataImpl.setNextWorkerNumberToUse(i, mantisJobStore);
                    return;
                } catch (Exception e) {
                    LOGGER.warn("Failed to setNextWorkerNumberToUse to {} (attempt {}/{})", new Object[]{Integer.valueOf(i), Integer.valueOf(i2), 10, e});
                    exc = e;
                    Thread.sleep(SLEEP_DURATION_MS);
                }
            }
            throw exc;
        }

        int getNextWorkerNumber(MantisJobMetadataImpl mantisJobMetadataImpl, MantisJobStore mantisJobStore) {
            if (this.hasErrored) {
                throw new IllegalStateException("Unexpected: Invalid state likely due to getting/settingnext worker number");
            }
            if (this.lastUsed == this.currLimit) {
                advance(mantisJobMetadataImpl, mantisJobStore);
            }
            int i = this.lastUsed + 1;
            this.lastUsed = i;
            return i;
        }
    }

    public static Props props(IJobClusterDefinition iJobClusterDefinition, MantisJobMetadataImpl mantisJobMetadataImpl, MantisJobStore mantisJobStore, MantisScheduler mantisScheduler, LifecycleEventPublisher lifecycleEventPublisher, CostsCalculator costsCalculator, IJobClusterScalerRuleData iJobClusterScalerRuleData) {
        return Props.create(JobActor.class, new Object[]{iJobClusterDefinition, mantisJobMetadataImpl, mantisJobStore, mantisScheduler, lifecycleEventPublisher, costsCalculator, iJobClusterScalerRuleData});
    }

    public static Props props(IJobClusterDefinition iJobClusterDefinition, MantisJobMetadataImpl mantisJobMetadataImpl, MantisJobStore mantisJobStore, MantisScheduler mantisScheduler, LifecycleEventPublisher lifecycleEventPublisher, CostsCalculator costsCalculator) {
        return Props.create(JobActor.class, new Object[]{iJobClusterDefinition, mantisJobMetadataImpl, mantisJobStore, mantisScheduler, lifecycleEventPublisher, costsCalculator, null});
    }

    public JobActor(IJobClusterDefinition iJobClusterDefinition, MantisJobMetadataImpl mantisJobMetadataImpl, MantisJobStore mantisJobStore, MantisScheduler mantisScheduler, LifecycleEventPublisher lifecycleEventPublisher, CostsCalculator costsCalculator, IJobClusterScalerRuleData iJobClusterScalerRuleData) {
        this.clusterName = mantisJobMetadataImpl.getClusterName();
        this.jobId = mantisJobMetadataImpl.getJobId();
        this.jobStore = mantisJobStore;
        this.jobClusterDefinition = iJobClusterDefinition;
        this.mantisScheduler = mantisScheduler;
        this.eventPublisher = lifecycleEventPublisher;
        this.mantisJobMetaData = mantisJobMetadataImpl;
        this.costsCalculator = costsCalculator;
        this.scalerRuleInfoBehaviorSubject = BehaviorSubject.create(JobScalerRuleInfo.builder().jobCompleted(false).jobId(this.jobId.getId()).rules(iJobClusterScalerRuleData == null ? null : iJobClusterScalerRuleData.getProtoRules()).build());
        this.initializedBehavior = getInitializedBehavior();
        this.activeBehavior = getActiveBehavior();
        this.terminatingBehavior = getTerminatingBehavior();
        this.terminatedBehavior = getTerminatedBehavior();
        this.metricsGroupId = getMetricGroupId(this.jobId.getId(), getResourceCluster());
        this.metrics = MetricsRegistry.getInstance().registerAndGet(new Metrics.Builder().id(this.metricsGroupId).addCounter("numWorkerResubmissions").addCounter("numWorkerResubmitLimitReached").addCounter("numWorkerStuckInAccepted").addCounter("numScaleStage").addCounter("numWorkersCompletedNotTerminal").addCounter("numSchedulingChangesRefreshed").addCounter("numMissingWorkerPorts").addCounter("numWorkerMissingHeartbeat").build());
        this.numWorkerResubmissions = this.metrics.getCounter("numWorkerResubmissions");
        this.numWorkerResubmitLimitReached = this.metrics.getCounter("numWorkerResubmitLimitReached");
        this.numScaleStage = this.metrics.getCounter("numScaleStage");
        this.numWorkersCompletedNotTerminal = this.metrics.getCounter("numWorkersCompletedNotTerminal");
        this.numSchedulingChangesRefreshed = this.metrics.getCounter("numSchedulingChangesRefreshed");
        this.numMissingWorkerPorts = this.metrics.getCounter("numMissingWorkerPorts");
    }

    MetricGroupId getMetricGroupId(String str, String str2) {
        return new MetricGroupId("JobActor", new Tag[]{new BasicTag("jobId", str), new BasicTag("resourceCluster", str2)});
    }

    void initialize(boolean z) throws Exception {
        LOGGER.info("Initializing Job {}", this.jobId);
        if (z) {
            this.eventPublisher.publishStatusEvent(new LifecycleEventsProto.JobStatusEvent(LifecycleEventsProto.StatusEvent.StatusEventType.INFO, "Job request received", getJobId(), getJobState()));
            if (isAutoscaled(this.mantisJobMetaData.getSchedulingInfo())) {
                LOGGER.info("Job is autoscaled, setting up Job Master");
                setupJobMasterStage(this.mantisJobMetaData.getSchedulingInfo());
            }
            LOGGER.info("Storing job");
            this.jobStore.storeNewJob(this.mantisJobMetaData);
        }
        LOGGER.info("Stored mantis job");
        this.workerManager = new WorkerManager(this, this.jobClusterDefinition.getWorkerMigrationConfig(), this.mantisScheduler, z, ConfigurationProvider.getConfig().isBatchSchedulingEnabled(), this.metrics);
        long workerTimeoutSecs = getWorkerTimeoutSecs();
        long stageAssignmentRefreshIntervalMs = ConfigurationProvider.getConfig().getStageAssignmentRefreshIntervalMs();
        getTimers().startPeriodicTimer(CHECK_HB_TIMER_KEY, new JobProto.CheckHeartBeat(), Duration.ofSeconds(workerTimeoutSecs));
        if (stageAssignmentRefreshIntervalMs > 0) {
            getTimers().startPeriodicTimer(REFRESH_SEND_STAGE_ASSIGNEMNTS_KEY, new JobProto.SendWorkerAssignementsIfChanged(), Duration.ofMillis(stageAssignmentRefreshIntervalMs));
        }
        this.mantisJobMetaData.getJobDefinition().getJobSla().getRuntimeLimitSecs();
        LOGGER.info("Job {} initialized", this.jobId);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public long getWorkerTimeoutSecs() {
        return this.mantisJobMetaData.getWorkerTimeoutSecs() > 0 ? this.mantisJobMetaData.getWorkerTimeoutSecs() : ConfigurationProvider.getConfig().getDefaultWorkerTimeoutSecs();
    }

    private void setupJobMasterStage(SchedulingInfo schedulingInfo) throws io.mantisrx.runtime.command.InvalidJobException {
        LOGGER.info("Job {} is autoscaled setting up Job Master", this.jobId);
        if (schedulingInfo.forStage(0) == null) {
            schedulingInfo.addJobMasterStage(StageSchedulingInfo.builder().numberOfInstances(1).machineDefinition(getJobMasterMachineDef()).build());
            this.mantisJobMetaData = new MantisJobMetadataImpl.Builder(this.mantisJobMetaData).withJobDefinition(new JobDefinition.Builder().from(this.mantisJobMetaData.getJobDefinition()).withSchedulingInfo(schedulingInfo).withNumberOfStages(schedulingInfo.getStages().size()).build()).build();
        }
    }

    private MachineDefinition getJobMasterMachineDef() {
        MasterConfiguration config = ConfigurationProvider.getConfig();
        return config != null ? new MachineDefinition(config.getJobMasterCores(), config.getJobMasterMemoryMB(), config.getJobMasterNetworkMbps(), config.getJobMasterDiskMB(), 1) : new MachineDefinition(DEFAULT_JOB_MASTER_CORES, 1024.0d, DEFAULT_JOB_MASTER_NW, 1024.0d, 1);
    }

    public void preStart() throws Exception {
        LOGGER.info("Job Actor {}-{} started", this.clusterName, this.jobId);
    }

    public void postStop() throws Exception {
        LOGGER.info("Job Actor {} stopped invoking cleanup logic", this.jobId);
        if (this.metricsGroupId != null) {
            MetricsRegistry.getInstance().remove(this.metricsGroupId);
        }
    }

    public SupervisorStrategy supervisorStrategy() {
        return MantisActorSupervisorStrategy.getInstance().create();
    }

    public AbstractActor.Receive createReceive() {
        return getInitializingBehavior();
    }

    private String genUnexpectedMsg(String str, String str2, String str3) {
        return String.format("Unexpected message %s received by Job actor %s in %s State", str, str2, str3);
    }

    private AbstractActor.Receive getTerminatingBehavior() {
        String str = "terminating";
        return receiveBuilder().match(JobClusterManagerProto.GetJobDetailsRequest.class, this::onGetJobDetails).match(JobClusterManagerProto.GetJobDefinitionUpdatedFromJobActorRequest.class, this::onGetJobDefinitionUpdatedFromJobActor).match(JobClusterManagerProto.ListWorkersRequest.class, this::onListActiveWorkers).match(WorkerEvent.class, workerEvent -> {
            LOGGER.warn("Job {} is Terminating, ignoring worker Events {}", this.jobId.getId(), workerEvent);
        }).match(JobProto.InitJob.class, initJob -> {
            getSender().tell(new JobProto.JobInitialized(initJob.requestId, BaseResponse.ResponseCode.SUCCESS, genUnexpectedMsg(initJob.toString(), this.jobId.getId(), str), this.jobId, initJob.requstor), getSelf());
        }).match(JobClusterManagerProto.ResubmitWorkerRequest.class, resubmitWorkerRequest -> {
            getSender().tell(new JobClusterManagerProto.ResubmitWorkerResponse(resubmitWorkerRequest.requestId, BaseResponse.ResponseCode.CLIENT_ERROR, genUnexpectedMsg(resubmitWorkerRequest.toString(), this.jobId.getId(), str)), getSelf());
        }).match(JobProto.CheckHeartBeat.class, checkHeartBeat -> {
            LOGGER.warn(genUnexpectedMsg(checkHeartBeat.toString(), this.jobId.getId(), str));
        }).match(JobProto.RuntimeLimitReached.class, runtimeLimitReached -> {
            LOGGER.warn(genUnexpectedMsg(runtimeLimitReached.toString(), this.jobId.getId(), str));
        }).match(JobClusterProto.KillJobRequest.class, killJobRequest -> {
            getSender().tell(new JobClusterManagerProto.KillJobResponse(killJobRequest.requestId, BaseResponse.ResponseCode.SUCCESS, JobState.Noop, genUnexpectedMsg(killJobRequest.toString(), this.jobId.getId(), str), this.jobId, killJobRequest.user), getSelf());
        }).match(JobClusterManagerProto.ScaleStageRequest.class, scaleStageRequest -> {
            getSender().tell(new JobClusterManagerProto.ScaleStageResponse(scaleStageRequest.requestId, BaseResponse.ResponseCode.CLIENT_ERROR, genUnexpectedMsg(scaleStageRequest.toString(), this.jobId.getId(), str), 0), getSelf());
        }).match(JobClusterManagerProto.GetJobSchedInfoRequest.class, getJobSchedInfoRequest -> {
            getSender().tell(new JobClusterManagerProto.GetJobSchedInfoResponse(getJobSchedInfoRequest.requestId, BaseResponse.ResponseCode.CLIENT_ERROR, genUnexpectedMsg(getJobSchedInfoRequest.toString(), this.jobId.getId(), str), Optional.empty()), getSelf());
        }).match(JobClusterManagerProto.GetLatestJobDiscoveryInfoRequest.class, getLatestJobDiscoveryInfoRequest -> {
            getSender().tell(new JobClusterManagerProto.GetLatestJobDiscoveryInfoResponse(getLatestJobDiscoveryInfoRequest.requestId, BaseResponse.ResponseCode.CLIENT_ERROR, genUnexpectedMsg(getLatestJobDiscoveryInfoRequest.toString(), this.jobId.getId(), str), Optional.empty()), getSelf());
        }).match(JobClusterScalerRuleProto.GetJobScalerRuleStreamRequest.class, getJobScalerRuleStreamRequest -> {
            getSender().tell(((JobClusterScalerRuleProto.GetJobScalerRuleStreamSubjectResponse.GetJobScalerRuleStreamSubjectResponseBuilder) ((JobClusterScalerRuleProto.GetJobScalerRuleStreamSubjectResponse.GetJobScalerRuleStreamSubjectResponseBuilder) ((JobClusterScalerRuleProto.GetJobScalerRuleStreamSubjectResponse.GetJobScalerRuleStreamSubjectResponseBuilder) JobClusterScalerRuleProto.GetJobScalerRuleStreamSubjectResponse.builder().responseCode(BaseResponse.ResponseCode.CLIENT_ERROR)).requestId(getJobScalerRuleStreamRequest.requestId)).message(genUnexpectedMsg(getJobScalerRuleStreamRequest.toString(), this.jobId.getId(), str))).build(), getSelf());
        }).match(JobProto.SendWorkerAssignementsIfChanged.class, sendWorkerAssignementsIfChanged -> {
            LOGGER.warn(genUnexpectedMsg(sendWorkerAssignementsIfChanged.toString(), this.jobId.getId(), str));
        }).match(JobClusterManagerProto.KillJobResponse.class, killJobResponse -> {
            LOGGER.info("Received Kill Job Response inTerminating State Ignoring");
        }).matchAny(obj -> {
            LOGGER.warn(genUnexpectedMsg(obj.toString(), this.jobId.getId(), str));
        }).build();
    }

    private AbstractActor.Receive getTerminatedBehavior() {
        String str = "terminated";
        return receiveBuilder().match(JobClusterManagerProto.GetJobDetailsRequest.class, this::onGetJobDetails).match(JobClusterManagerProto.GetJobDefinitionUpdatedFromJobActorRequest.class, this::onGetJobDefinitionUpdatedFromJobActor).match(JobClusterManagerProto.ListWorkersRequest.class, this::onListActiveWorkers).match(JobProto.InitJob.class, initJob -> {
            getSender().tell(new JobProto.JobInitialized(initJob.requestId, BaseResponse.ResponseCode.SUCCESS, genUnexpectedMsg(initJob.toString(), this.jobId.getId(), str), this.jobId, initJob.requstor), getSelf());
        }).match(JobClusterManagerProto.ResubmitWorkerRequest.class, resubmitWorkerRequest -> {
            getSender().tell(new JobClusterManagerProto.ResubmitWorkerResponse(resubmitWorkerRequest.requestId, BaseResponse.ResponseCode.CLIENT_ERROR, genUnexpectedMsg(resubmitWorkerRequest.toString(), this.jobId.getId(), str)), getSelf());
        }).match(JobProto.CheckHeartBeat.class, checkHeartBeat -> {
            LOGGER.warn(genUnexpectedMsg(checkHeartBeat.toString(), this.jobId.getId(), str));
        }).match(JobProto.MigrateDisabledVmWorkersRequest.class, migrateDisabledVmWorkersRequest -> {
            LOGGER.warn(genUnexpectedMsg(migrateDisabledVmWorkersRequest.toString(), this.jobId.getId(), str));
        }).match(JobProto.RuntimeLimitReached.class, runtimeLimitReached -> {
            LOGGER.warn(genUnexpectedMsg(runtimeLimitReached.toString(), this.jobId.getId(), str));
        }).match(JobClusterProto.KillJobRequest.class, killJobRequest -> {
            getSender().tell(new JobClusterManagerProto.KillJobResponse(killJobRequest.requestId, BaseResponse.ResponseCode.SUCCESS, JobState.Noop, genUnexpectedMsg(killJobRequest.toString(), this.jobId.getId(), str), this.jobId, killJobRequest.user), getSelf());
        }).match(JobClusterManagerProto.ScaleStageRequest.class, scaleStageRequest -> {
            getSender().tell(new JobClusterManagerProto.ScaleStageResponse(scaleStageRequest.requestId, BaseResponse.ResponseCode.CLIENT_ERROR, genUnexpectedMsg(scaleStageRequest.toString(), this.jobId.getId(), str), 0), getSelf());
        }).match(JobClusterManagerProto.GetJobSchedInfoRequest.class, getJobSchedInfoRequest -> {
            getSender().tell(new JobClusterManagerProto.GetJobSchedInfoResponse(getJobSchedInfoRequest.requestId, BaseResponse.ResponseCode.CLIENT_ERROR, genUnexpectedMsg(getJobSchedInfoRequest.toString(), this.jobId.getId(), str), Optional.empty()), getSelf());
        }).match(JobClusterManagerProto.GetLatestJobDiscoveryInfoRequest.class, getLatestJobDiscoveryInfoRequest -> {
            getSender().tell(new JobClusterManagerProto.GetLatestJobDiscoveryInfoResponse(getLatestJobDiscoveryInfoRequest.requestId, BaseResponse.ResponseCode.CLIENT_ERROR, genUnexpectedMsg(getLatestJobDiscoveryInfoRequest.toString(), this.jobId.getId(), str), Optional.empty()), getSelf());
        }).match(JobClusterScalerRuleProto.GetJobScalerRuleStreamRequest.class, getJobScalerRuleStreamRequest -> {
            getSender().tell(((JobClusterScalerRuleProto.GetJobScalerRuleStreamSubjectResponse.GetJobScalerRuleStreamSubjectResponseBuilder) ((JobClusterScalerRuleProto.GetJobScalerRuleStreamSubjectResponse.GetJobScalerRuleStreamSubjectResponseBuilder) ((JobClusterScalerRuleProto.GetJobScalerRuleStreamSubjectResponse.GetJobScalerRuleStreamSubjectResponseBuilder) JobClusterScalerRuleProto.GetJobScalerRuleStreamSubjectResponse.builder().responseCode(BaseResponse.ResponseCode.CLIENT_ERROR)).requestId(getJobScalerRuleStreamRequest.requestId)).message(genUnexpectedMsg(getJobScalerRuleStreamRequest.toString(), this.jobId.getId(), str))).build(), getSelf());
        }).match(JobClusterManagerProto.KillJobResponse.class, killJobResponse -> {
            LOGGER.info("Received Kill Job Response inTerminating State Ignoring");
        }).match(JobProto.SendWorkerAssignementsIfChanged.class, sendWorkerAssignementsIfChanged -> {
            LOGGER.warn(genUnexpectedMsg(sendWorkerAssignementsIfChanged.toString(), this.jobId.getId(), str));
        }).match(WorkerEvent.class, workerEvent -> {
            LOGGER.info("Received worker event  in Terminated State Ignoring");
        }).matchAny(obj -> {
            LOGGER.warn(genUnexpectedMsg(obj.toString(), this.jobId.getId(), str));
        }).build();
    }

    private AbstractActor.Receive getActiveBehavior() {
        String str = "active";
        return receiveBuilder().match(JobClusterManagerProto.GetJobDetailsRequest.class, this::onGetJobDetails).match(JobClusterManagerProto.GetJobDefinitionUpdatedFromJobActorRequest.class, this::onGetJobDefinitionUpdatedFromJobActor).match(WorkerEvent.class, workerEvent -> {
            processWorkerEvent(workerEvent);
        }).match(JobClusterManagerProto.ResubmitWorkerRequest.class, this::onResubmitWorker).match(JobProto.CheckHeartBeat.class, this::onCheckHeartBeats).match(JobProto.MigrateDisabledVmWorkersRequest.class, this::onMigrateWorkers).match(JobProto.RuntimeLimitReached.class, this::onRuntimeLimitReached).match(JobClusterProto.KillJobRequest.class, this::onJobKill).match(JobClusterManagerProto.ScaleStageRequest.class, this::onScaleStage).match(JobClusterManagerProto.ListWorkersRequest.class, this::onListActiveWorkers).match(JobClusterManagerProto.GetJobSchedInfoRequest.class, this::onGetJobStatusSubject).match(JobClusterManagerProto.GetLatestJobDiscoveryInfoRequest.class, this::onGetLatestJobDiscoveryInfo).match(JobProto.SendWorkerAssignementsIfChanged.class, this::onSendWorkerAssignments).match(IJobClusterScalerRuleData.class, this::onScalerRuleDataUpdate).match(JobClusterScalerRuleProto.GetJobScalerRuleStreamRequest.class, this::onGetJobScalerRuleStreamRequest).match(JobProto.InitJob.class, initJob -> {
            getSender().tell(new JobProto.JobInitialized(initJob.requestId, BaseResponse.ResponseCode.SUCCESS, genUnexpectedMsg(initJob.toString(), this.jobId.getId(), str), this.jobId, initJob.requstor), getSelf());
        }).matchAny(obj -> {
            LOGGER.warn(genUnexpectedMsg(obj.toString(), this.jobId.getId(), str));
        }).build();
    }

    private AbstractActor.Receive getInitializedBehavior() {
        String str = "initialized";
        return receiveBuilder().match(JobClusterManagerProto.GetJobDetailsRequest.class, this::onGetJobDetails).match(JobClusterManagerProto.GetJobDefinitionUpdatedFromJobActorRequest.class, this::onGetJobDefinitionUpdatedFromJobActor).match(WorkerEvent.class, workerEvent -> {
            processWorkerEvent(workerEvent);
        }).match(JobProto.CheckHeartBeat.class, this::onCheckHeartBeats).match(JobProto.MigrateDisabledVmWorkersRequest.class, this::onMigrateWorkers).match(JobClusterProto.KillJobRequest.class, this::onJobKill).match(JobClusterManagerProto.ListWorkersRequest.class, this::onListActiveWorkers).match(JobClusterManagerProto.GetJobSchedInfoRequest.class, this::onGetJobStatusSubject).match(JobClusterManagerProto.GetLatestJobDiscoveryInfoRequest.class, this::onGetLatestJobDiscoveryInfo).match(JobProto.SendWorkerAssignementsIfChanged.class, this::onSendWorkerAssignments).match(IJobClusterScalerRuleData.class, this::onScalerRuleDataUpdate).match(JobClusterScalerRuleProto.GetJobScalerRuleStreamRequest.class, this::onGetJobScalerRuleStreamRequest).match(JobClusterManagerProto.ResubmitWorkerRequest.class, resubmitWorkerRequest -> {
            getSender().tell(new JobClusterManagerProto.ResubmitWorkerResponse(resubmitWorkerRequest.requestId, BaseResponse.ResponseCode.CLIENT_ERROR, genUnexpectedMsg(resubmitWorkerRequest.toString(), this.jobId.getId(), str)), getSelf());
        }).match(JobProto.RuntimeLimitReached.class, runtimeLimitReached -> {
            LOGGER.warn(genUnexpectedMsg(runtimeLimitReached.toString(), this.jobId.getId(), str));
        }).match(JobClusterManagerProto.ScaleStageRequest.class, scaleStageRequest -> {
            getSender().tell(new JobClusterManagerProto.ScaleStageResponse(scaleStageRequest.requestId, BaseResponse.ResponseCode.CLIENT_ERROR, genUnexpectedMsg(scaleStageRequest.toString(), this.jobId.getId(), str), 0), getSelf());
        }).match(JobProto.InitJob.class, initJob -> {
            getSender().tell(new JobProto.JobInitialized(initJob.requestId, BaseResponse.ResponseCode.SUCCESS, genUnexpectedMsg(initJob.toString(), this.jobId.getId(), str), this.jobId, initJob.requstor), getSelf());
        }).matchAny(obj -> {
            LOGGER.warn(genUnexpectedMsg(obj.toString(), this.jobId.getId(), str));
        }).build();
    }

    private AbstractActor.Receive getInitializingBehavior() {
        String str = "initializing";
        return receiveBuilder().match(JobProto.InitJob.class, this::onJobInitialize).match(JobClusterManagerProto.GetJobDetailsRequest.class, getJobDetailsRequest -> {
            getSender().tell(new JobClusterManagerProto.GetJobDetailsResponse(getJobDetailsRequest.requestId, BaseResponse.ResponseCode.CLIENT_ERROR, genUnexpectedMsg(getJobDetailsRequest.toString(), this.jobId.getId(), str), Optional.empty()), getSelf());
        }).match(JobClusterManagerProto.GetJobDefinitionUpdatedFromJobActorRequest.class, getJobDefinitionUpdatedFromJobActorRequest -> {
            getSender().tell(new JobClusterManagerProto.GetJobDefinitionUpdatedFromJobActorResponse(getJobDefinitionUpdatedFromJobActorRequest.requestId, BaseResponse.ResponseCode.SUCCESS, "", getJobDefinitionUpdatedFromJobActorRequest.getUser(), getJobDefinitionUpdatedFromJobActorRequest.getJobDefinition(), getJobDefinitionUpdatedFromJobActorRequest.isAutoResubmit(), getJobDefinitionUpdatedFromJobActorRequest.isQuickSubmit(), getJobDefinitionUpdatedFromJobActorRequest.getOriginalSender()), getSelf());
        }).match(WorkerEvent.class, workerEvent -> {
            LOGGER.warn(genUnexpectedMsg(workerEvent.toString(), this.jobId.getId(), str));
        }).match(JobClusterManagerProto.ResubmitWorkerRequest.class, resubmitWorkerRequest -> {
            getSender().tell(new JobClusterManagerProto.ResubmitWorkerResponse(resubmitWorkerRequest.requestId, BaseResponse.ResponseCode.CLIENT_ERROR, genUnexpectedMsg(resubmitWorkerRequest.toString(), this.jobId.getId(), str)), getSelf());
        }).match(JobProto.CheckHeartBeat.class, checkHeartBeat -> {
            LOGGER.warn(genUnexpectedMsg(checkHeartBeat.toString(), this.jobId.getId(), str));
        }).match(JobProto.MigrateDisabledVmWorkersRequest.class, migrateDisabledVmWorkersRequest -> {
            LOGGER.warn(genUnexpectedMsg(migrateDisabledVmWorkersRequest.toString(), this.jobId.getId(), str));
        }).match(JobProto.RuntimeLimitReached.class, runtimeLimitReached -> {
            LOGGER.warn(genUnexpectedMsg(runtimeLimitReached.toString(), this.jobId.getId(), str));
        }).match(JobClusterProto.KillJobRequest.class, killJobRequest -> {
            getSender().tell(new JobClusterManagerProto.KillJobResponse(killJobRequest.requestId, BaseResponse.ResponseCode.CLIENT_ERROR, JobState.Noop, genUnexpectedMsg(killJobRequest.toString(), this.jobId.getId(), str), this.jobId, killJobRequest.user), getSelf());
        }).match(JobClusterManagerProto.ScaleStageRequest.class, scaleStageRequest -> {
            getSender().tell(new JobClusterManagerProto.ScaleStageResponse(scaleStageRequest.requestId, BaseResponse.ResponseCode.CLIENT_ERROR, genUnexpectedMsg(scaleStageRequest.toString(), this.jobId.getId(), str), 0), getSelf());
        }).match(JobClusterManagerProto.ListWorkersRequest.class, listWorkersRequest -> {
            getSender().tell(new JobClusterManagerProto.ListWorkersResponse(listWorkersRequest.requestId, BaseResponse.ResponseCode.CLIENT_ERROR, genUnexpectedMsg(listWorkersRequest.toString(), this.jobId.getId(), str), Lists.newArrayList()), getSelf());
        }).match(JobClusterManagerProto.GetJobSchedInfoRequest.class, getJobSchedInfoRequest -> {
            getSender().tell(new JobClusterManagerProto.GetJobSchedInfoResponse(getJobSchedInfoRequest.requestId, BaseResponse.ResponseCode.CLIENT_ERROR, genUnexpectedMsg(getJobSchedInfoRequest.toString(), this.jobId.getId(), str), Optional.empty()), getSelf());
        }).match(JobClusterManagerProto.GetLatestJobDiscoveryInfoRequest.class, getLatestJobDiscoveryInfoRequest -> {
            getSender().tell(new JobClusterManagerProto.GetLatestJobDiscoveryInfoResponse(getLatestJobDiscoveryInfoRequest.requestId, BaseResponse.ResponseCode.CLIENT_ERROR, genUnexpectedMsg(getLatestJobDiscoveryInfoRequest.toString(), this.jobId.getId(), str), Optional.empty()), getSelf());
        }).matchAny(obj -> {
            LOGGER.warn(genUnexpectedMsg(obj.toString(), this.jobId.getId(), str));
        }).build();
    }

    @Override // io.mantisrx.master.jobcluster.job.IMantisJobManager
    public void onJobInitialize(JobProto.InitJob initJob) {
        ActorRef sender = getSender();
        try {
            initialize(initJob.isSubmit);
            if (JobState.isRunningState(this.mantisJobMetaData.getState())) {
                getContext().become(this.activeBehavior);
                setRuntimeLimitTimersIfRequired(Instant.now());
            } else {
                getContext().become(this.initializedBehavior);
            }
            sender.tell(new JobProto.JobInitialized(initJob.requestId, BaseResponse.ResponseCode.SUCCESS, String.format("Job %s initialized successfully", this.jobId), this.jobId, initJob.requstor), getSelf());
        } catch (Exception e) {
            LOGGER.error("Exception initializing job ", e);
            sender.tell(new JobProto.JobInitialized(initJob.requestId, BaseResponse.ResponseCode.SERVER_ERROR, "" + e.getMessage(), this.jobId, initJob.requstor), getSelf());
        }
    }

    @Override // io.mantisrx.master.jobcluster.job.IMantisJobManager
    public void onGetJobDetails(JobClusterManagerProto.GetJobDetailsRequest getJobDetailsRequest) {
        getSender().tell(new JobClusterManagerProto.GetJobDetailsResponse(getJobDetailsRequest.requestId, BaseResponse.ResponseCode.SUCCESS, "", Optional.of(getJobDetails())), getSelf());
    }

    public void onGetJobDefinitionUpdatedFromJobActor(JobClusterManagerProto.GetJobDefinitionUpdatedFromJobActorRequest getJobDefinitionUpdatedFromJobActorRequest) {
        getSender().tell(getIntermediateJobDefinition(getJobDefinitionUpdatedFromJobActorRequest), getSelf());
    }

    @Override // io.mantisrx.master.jobcluster.job.IMantisJobManager
    public void onGetJobStatusSubject(JobClusterManagerProto.GetJobSchedInfoRequest getJobSchedInfoRequest) {
        ActorRef sender = getSender();
        if (getJobSchedInfoRequest.getJobId().equals(this.jobId)) {
            sender.tell(new JobClusterManagerProto.GetJobSchedInfoResponse(getJobSchedInfoRequest.requestId, BaseResponse.ResponseCode.SUCCESS, "", Optional.of(this.workerManager.getJobStatusSubject())), getSelf());
            return;
        }
        String str = "JobId in the request " + getJobSchedInfoRequest.getJobId() + " does not match Job Actors job Id " + this.jobId;
        LOGGER.warn(str);
        sender.tell(new JobClusterManagerProto.GetJobSchedInfoResponse(getJobSchedInfoRequest.requestId, BaseResponse.ResponseCode.CLIENT_ERROR, str, Optional.empty()), getSelf());
    }

    @Override // io.mantisrx.master.jobcluster.job.IMantisJobManager
    public void onGetLatestJobDiscoveryInfo(JobClusterManagerProto.GetLatestJobDiscoveryInfoRequest getLatestJobDiscoveryInfoRequest) {
        ActorRef sender = getSender();
        if (!getLatestJobDiscoveryInfoRequest.getJobCluster().equals(this.jobId.getCluster())) {
            String str = "JobCluster in the request " + getLatestJobDiscoveryInfoRequest.getJobCluster() + " does not match Job Actors job ID " + this.jobId;
            LOGGER.warn(str);
            sender.tell(new JobClusterManagerProto.GetLatestJobDiscoveryInfoResponse(getLatestJobDiscoveryInfoRequest.requestId, BaseResponse.ResponseCode.SERVER_ERROR, str, Optional.empty()), getSelf());
        } else {
            JobSchedulingInfo jobSchedulingInfo = (JobSchedulingInfo) this.workerManager.getJobStatusSubject().getValue();
            if (jobSchedulingInfo != null) {
                sender.tell(new JobClusterManagerProto.GetLatestJobDiscoveryInfoResponse(getLatestJobDiscoveryInfoRequest.requestId, BaseResponse.ResponseCode.SUCCESS, "", Optional.ofNullable(jobSchedulingInfo)), getSelf());
            } else {
                LOGGER.info("discoveryInfo from BehaviorSubject is null {}", this.jobId);
                sender.tell(new JobClusterManagerProto.GetLatestJobDiscoveryInfoResponse(getLatestJobDiscoveryInfoRequest.requestId, BaseResponse.ResponseCode.SERVER_ERROR, "discoveryInfo from BehaviorSubject is null " + this.jobId, Optional.empty()), getSelf());
            }
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    public void onGetJobScalerRuleStreamRequest(JobClusterScalerRuleProto.GetJobScalerRuleStreamRequest getJobScalerRuleStreamRequest) {
        ActorRef sender = getSender();
        if (!getJobScalerRuleStreamRequest.getJobId().equals(this.jobId)) {
            String format = String.format("JobCluster in the request %s does not match Job Actors job ID %s", getJobScalerRuleStreamRequest.getJobId(), this.jobId);
            LOGGER.error(format);
            sender.tell(((JobClusterScalerRuleProto.GetJobScalerRuleStreamSubjectResponse.GetJobScalerRuleStreamSubjectResponseBuilder) ((JobClusterScalerRuleProto.GetJobScalerRuleStreamSubjectResponse.GetJobScalerRuleStreamSubjectResponseBuilder) ((JobClusterScalerRuleProto.GetJobScalerRuleStreamSubjectResponse.GetJobScalerRuleStreamSubjectResponseBuilder) JobClusterScalerRuleProto.GetJobScalerRuleStreamSubjectResponse.builder().responseCode(BaseResponse.ResponseCode.CLIENT_ERROR)).requestId(getJobScalerRuleStreamRequest.requestId)).message(format)).build(), getSelf());
        } else if (this.scalerRuleInfoBehaviorSubject != null) {
            sender.tell(((JobClusterScalerRuleProto.GetJobScalerRuleStreamSubjectResponse.GetJobScalerRuleStreamSubjectResponseBuilder) ((JobClusterScalerRuleProto.GetJobScalerRuleStreamSubjectResponse.GetJobScalerRuleStreamSubjectResponseBuilder) JobClusterScalerRuleProto.GetJobScalerRuleStreamSubjectResponse.builder().responseCode(BaseResponse.ResponseCode.SUCCESS)).requestId(getJobScalerRuleStreamRequest.requestId)).jobScalerRuleStreamBehaviorSubject(this.scalerRuleInfoBehaviorSubject).build(), getSelf());
        } else {
            LOGGER.error("scalerRuleInfoBehaviorSubject is null in {}", this.jobId);
            sender.tell(((JobClusterScalerRuleProto.GetJobScalerRuleStreamSubjectResponse.GetJobScalerRuleStreamSubjectResponseBuilder) ((JobClusterScalerRuleProto.GetJobScalerRuleStreamSubjectResponse.GetJobScalerRuleStreamSubjectResponseBuilder) ((JobClusterScalerRuleProto.GetJobScalerRuleStreamSubjectResponse.GetJobScalerRuleStreamSubjectResponseBuilder) JobClusterScalerRuleProto.GetJobScalerRuleStreamSubjectResponse.builder().responseCode(BaseResponse.ResponseCode.SERVER_ERROR)).requestId(getJobScalerRuleStreamRequest.requestId)).message(String.format("%s has null scalerRuleInfoBehaviorSubject", this.jobId))).build(), getSelf());
        }
    }

    @Override // io.mantisrx.master.jobcluster.job.IMantisJobManager
    public void processWorkerEvent(WorkerEvent workerEvent) {
        this.workerManager.processEvent(workerEvent, this.mantisJobMetaData.getState());
    }

    @Override // io.mantisrx.master.jobcluster.job.IMantisJobManager
    public void onResubmitWorker(JobClusterManagerProto.ResubmitWorkerRequest resubmitWorkerRequest) {
        ActorRef sender = getSender();
        try {
            this.eventPublisher.publishStatusEvent(new LifecycleEventsProto.JobStatusEvent(LifecycleEventsProto.StatusEvent.StatusEventType.INFO, resubmitWorkerRequest.getWorkerNum() + " workerNum resubmit requested by " + resubmitWorkerRequest.getUser() + " , reason: " + resubmitWorkerRequest.getReason(), getJobId(), getJobState()));
            this.workerManager.resubmitWorker(resubmitWorkerRequest.getWorkerNum());
            this.numWorkerResubmissions.increment();
            sender.tell(new JobClusterManagerProto.ResubmitWorkerResponse(resubmitWorkerRequest.requestId, BaseResponse.ResponseCode.SUCCESS, String.format("Worker %d of job %s resubmitted", Integer.valueOf(resubmitWorkerRequest.getWorkerNum()), resubmitWorkerRequest.getJobId())), getSelf());
        } catch (Exception e) {
            sender.tell(new JobClusterManagerProto.ResubmitWorkerResponse(resubmitWorkerRequest.requestId, BaseResponse.ResponseCode.SERVER_ERROR, e.getMessage()), getSelf());
        }
    }

    @Override // io.mantisrx.master.jobcluster.job.IMantisJobManager
    public void onMigrateWorkers(JobProto.MigrateDisabledVmWorkersRequest migrateDisabledVmWorkersRequest) {
        this.workerManager.migrateDisabledVmWorkers(migrateDisabledVmWorkersRequest.time);
    }

    @Override // io.mantisrx.master.jobcluster.job.IMantisJobManager
    public void onCheckHeartBeats(JobProto.CheckHeartBeat checkHeartBeat) {
        this.workerManager.checkHeartBeats(checkHeartBeat.getTime());
    }

    @Override // io.mantisrx.master.jobcluster.job.IMantisJobManager
    public void onRuntimeLimitReached(JobProto.RuntimeLimitReached runtimeLimitReached) {
        LOGGER.info("In onRuntimeLimitReached {} for Job {} ", Instant.now(), this.jobId);
        LOGGER.info("Job {} Started at {} and killed at {} due to Runtime limit reached", new Object[]{this.jobId, this.mantisJobMetaData.getStartedAtInstant().orElse(Instant.now()), Instant.now()});
        getContext().getParent().tell(new JobClusterProto.KillJobRequest(this.jobId, "runtime limit reached", JobCompletedReason.Killed, StringConstants.MANTIS_MASTER_USER, ActorRef.noSender()), getSelf());
    }

    @Override // io.mantisrx.master.jobcluster.job.IMantisJobManager
    public void onSendWorkerAssignments(JobProto.SendWorkerAssignementsIfChanged sendWorkerAssignementsIfChanged) {
        this.workerManager.refreshAndSendWorkerAssignments();
    }

    public void onScalerRuleDataUpdate(IJobClusterScalerRuleData iJobClusterScalerRuleData) {
        LOGGER.info("Job actor {} received new Scaler Rule Data: {}", this.jobId, iJobClusterScalerRuleData);
        this.scalerRuleInfoBehaviorSubject.onNext(JobScalerRuleInfo.builder().jobCompleted(false).jobId(this.jobId.getId()).rules(iJobClusterScalerRuleData == null ? null : iJobClusterScalerRuleData.getProtoRules()).build());
    }

    @Override // io.mantisrx.master.jobcluster.job.IMantisJobManager
    public void onJobKill(JobClusterProto.KillJobRequest killJobRequest) {
        ActorRef sender = getSender();
        LOGGER.info("Shutting down job {} on request by {}", this.jobId, sender);
        try {
            this.eventPublisher.publishStatusEvent(new LifecycleEventsProto.JobStatusEvent(LifecycleEventsProto.StatusEvent.StatusEventType.INFO, "Killing job, reason: " + killJobRequest.reason, getJobId(), getJobState()));
            JobState jobState = (killJobRequest.jobCompletedReason.equals(JobCompletedReason.Error) || killJobRequest.jobCompletedReason.equals(JobCompletedReason.Lost)) ? JobState.Failed : JobState.Completed;
            updateStateAndPersist(jobState);
            sender.tell(new JobClusterProto.KillJobResponse(killJobRequest.requestId, BaseResponse.ResponseCode.SUCCESS, getJobState(), getJobId() + " terminated", getJobId(), this.mantisJobMetaData, killJobRequest.user, killJobRequest.requestor), getSelf());
            getTimers().cancel(CHECK_HB_TIMER_KEY);
            getContext().become(this.terminatingBehavior);
            shutdown(jobState, killJobRequest.reason);
            performFinalShutdown();
        } catch (Exception e) {
            LOGGER.error("Failed to kill job {}", this.jobId, e);
            sender.tell(new JobClusterProto.KillJobResponse(killJobRequest.requestId, BaseResponse.ResponseCode.SERVER_ERROR, getJobState(), getJobId() + " Could not be terminated due to " + e.getMessage(), getJobId(), this.mantisJobMetaData, killJobRequest.user, killJobRequest.requestor), getSelf());
        }
    }

    @Override // io.mantisrx.master.jobcluster.job.IMantisJobManager
    public void onScaleStage(JobClusterManagerProto.ScaleStageRequest scaleStageRequest) {
        LOGGER.info("In Scale stage {} for Job {}", scaleStageRequest, this.jobId);
        ActorRef sender = getSender();
        int stageNum = scaleStageRequest.getStageNum();
        Optional<IMantisStageMetadata> stageMetadata = this.mantisJobMetaData.getStageMetadata(stageNum);
        if (!stageMetadata.isPresent()) {
            LOGGER.warn("Stage {} does not exist in Job {}", Integer.valueOf(scaleStageRequest.getStageNum()), this.jobId);
            sender.tell(new JobClusterManagerProto.ScaleStageResponse(scaleStageRequest.requestId, BaseResponse.ResponseCode.CLIENT_ERROR, "Non existent stage " + scaleStageRequest.getStageNum(), 0), getSelf());
            return;
        }
        MantisStageMetadataImpl mantisStageMetadataImpl = (MantisStageMetadataImpl) stageMetadata.get();
        if (!mantisStageMetadataImpl.getScalable()) {
            LOGGER.warn("Stage {} is not scalable in Job {}", Integer.valueOf(scaleStageRequest.getStageNum()), this.jobId);
            this.eventPublisher.publishStatusEvent(new LifecycleEventsProto.JobStatusEvent(LifecycleEventsProto.StatusEvent.StatusEventType.WARN, "Can't change #workers to " + scaleStageRequest.getNumWorkers() + ", stage " + scaleStageRequest.getStageNum() + " is not scalable", getJobId(), getJobState()));
            sender.tell(new JobClusterManagerProto.ScaleStageResponse(scaleStageRequest.requestId, BaseResponse.ResponseCode.CLIENT_ERROR, "Stage " + scaleStageRequest.getStageNum() + " is not scalable", 0), getSelf());
            return;
        }
        try {
            List list = (List) ((List) Optional.ofNullable(this.scalerRuleInfoBehaviorSubject.getValue()).map((v0) -> {
                return v0.getRules();
            }).orElse(Collections.emptyList())).stream().filter(jobScalingRule -> {
                return (jobScalingRule.getScalerConfig() == null || jobScalingRule.getScalerConfig().getStageConfigMap() == null) ? false : true;
            }).map(jobScalingRule2 -> {
                return jobScalingRule2.getScalerConfig().getScalerConfigByStageNum(stageNum);
            }).filter((v0) -> {
                return v0.isPresent();
            }).map(optional -> {
                return ((JobScalingRule.StageScalerConfig) optional.get()).getScalingPolicy();
            }).filter((v0) -> {
                return Objects.nonNull(v0);
            }).collect(Collectors.toList());
            int scaleStage = this.workerManager.scaleStage(mantisStageMetadataImpl, ((Integer) list.stream().max(Comparator.comparingInt((v0) -> {
                return v0.getMax();
            })).map((v0) -> {
                return v0.getMax();
            }).orElse(Integer.MAX_VALUE)).intValue(), ((Integer) list.stream().min(Comparator.comparingInt((v0) -> {
                return v0.getMin();
            })).map((v0) -> {
                return v0.getMin();
            }).orElse(0)).intValue(), scaleStageRequest.getNumWorkers(), scaleStageRequest.getReason());
            LOGGER.info("Scaled stage {} to {} workers for Job {}", new Object[]{Integer.valueOf(scaleStageRequest.getStageNum()), Integer.valueOf(scaleStage), this.jobId});
            this.numScaleStage.increment();
            sender.tell(new JobClusterManagerProto.ScaleStageResponse(scaleStageRequest.requestId, BaseResponse.ResponseCode.SUCCESS, String.format("Scaled stage %d to %d workers", Integer.valueOf(scaleStageRequest.getStageNum()), Integer.valueOf(scaleStage)), scaleStage), getSelf());
        } catch (Exception e) {
            String format = String.format("Stage %d scale failed due to %s", Integer.valueOf(scaleStageRequest.getStageNum()), e.getMessage());
            LOGGER.error(format, e);
            sender.tell(new JobClusterManagerProto.ScaleStageResponse(scaleStageRequest.requestId, BaseResponse.ResponseCode.SERVER_ERROR, format, 0), getSelf());
        }
    }

    @Override // io.mantisrx.master.jobcluster.job.IMantisJobManager
    public void onListActiveWorkers(JobClusterManagerProto.ListWorkersRequest listWorkersRequest) {
        getSender().tell(new JobClusterManagerProto.ListWorkersResponse(listWorkersRequest.requestId, BaseResponse.ResponseCode.SUCCESS, "", Collections.unmodifiableList(this.workerManager.getActiveWorkers(listWorkersRequest.getLimit()))), getSelf());
    }

    private void performFinalShutdown() {
        try {
            LOGGER.info("Archiving Job {}", this.jobId);
            this.jobStore.archiveJob(this.mantisJobMetaData);
        } catch (IOException e) {
            LOGGER.warn("Exception archiving job " + this.mantisJobMetaData.getJobId(), e);
        }
        getContext().become(this.terminatedBehavior);
        getSelf().tell(PoisonPill.getInstance(), ActorRef.noSender());
    }

    @Override // io.mantisrx.master.jobcluster.job.IMantisJobManager
    public void onAllWorkersCompleted() {
        LOGGER.info("JobActor: onAllWorkersCompleted with current state {}", this.mantisJobMetaData.getState());
        if (JobState.isTerminalState(this.mantisJobMetaData.getState()) || this.allWorkersCompleted) {
            LOGGER.debug("Job {} Kill already requested", this.jobId);
            return;
        }
        LOGGER.info("All workers completed but job {} in {} state. Request termination", this.jobId, getJobState());
        this.allWorkersCompleted = true;
        getContext().parent().tell(new JobClusterProto.KillJobRequest(this.jobId, "Job Completed", JobCompletedReason.Normal, StringConstants.MANTIS_MASTER_USER, ActorRef.noSender()), getSelf());
        this.numWorkersCompletedNotTerminal.increment();
    }

    @Override // io.mantisrx.master.jobcluster.job.IMantisJobManager
    public boolean onAllWorkersStarted() {
        LOGGER.info("In onAllWorkersStarted for Job {}", this.jobId);
        boolean z = true;
        if (this.mantisJobMetaData.getState() == JobState.Accepted) {
            try {
                updateStateAndPersist(JobState.Launched);
                getContext().become(this.activeBehavior);
                this.eventPublisher.publishStatusEvent(new LifecycleEventsProto.JobStatusEvent(LifecycleEventsProto.StatusEvent.StatusEventType.INFO, "all workers started, job transitioning to Active", getJobId(), getJobState()));
                getContext().getParent().tell(new JobClusterProto.JobStartedEvent(getJobId()), getSelf());
                Instant now = Instant.now();
                this.mantisJobMetaData.setStartedAt(now.toEpochMilli(), this.jobStore);
                setRuntimeLimitTimersIfRequired(now);
            } catch (Exception e) {
                LOGGER.error("Error processing all worker started event ", e);
                z = false;
            }
        } else if (this.mantisJobMetaData.getState() == JobState.Launched) {
            LOGGER.info("Job is already in launched state");
            z = false;
        } else {
            LOGGER.warn("Unexpected all Workers Started Event while job in {} state", this.mantisJobMetaData.getState());
            z = false;
        }
        return z;
    }

    @Override // io.mantisrx.master.jobcluster.job.IMantisJobManager
    public boolean onTooManyWorkerResubmits() {
        LOGGER.warn("Too many worker resubmits detected for Job {}. Requesting job shutdown", this.jobId);
        this.eventPublisher.publishStatusEvent(new LifecycleEventsProto.JobStatusEvent(LifecycleEventsProto.StatusEvent.StatusEventType.ERROR, "Worker Resubmit limit reached, shutting down job", getJobId(), getJobState()));
        this.numWorkerResubmitLimitReached.increment();
        getContext().parent().tell(new JobClusterProto.KillJobRequest(this.jobId, "Too many worker resubmits", JobCompletedReason.Error, StringConstants.MANTIS_MASTER_USER, ActorRef.noSender()), getSelf());
        return true;
    }

    @Override // io.mantisrx.master.jobcluster.job.IMantisJobManager
    public IMantisJobMetadata getJobDetails() {
        return this.mantisJobMetaData;
    }

    public JobClusterManagerProto.GetJobDefinitionUpdatedFromJobActorResponse getIntermediateJobDefinition(JobClusterManagerProto.GetJobDefinitionUpdatedFromJobActorRequest getJobDefinitionUpdatedFromJobActorRequest) {
        JobDefinition jobDefinition = getJobDefinitionUpdatedFromJobActorRequest.getJobDefinition();
        boolean isQuickSubmit = getJobDefinitionUpdatedFromJobActorRequest.isQuickSubmit();
        MantisJobMetadataImpl mantisJobMetadataImpl = this.mantisJobMetaData;
        try {
            return new JobClusterManagerProto.GetJobDefinitionUpdatedFromJobActorResponse(getJobDefinitionUpdatedFromJobActorRequest.requestId, BaseResponse.ResponseCode.SUCCESS, "", getJobDefinitionUpdatedFromJobActorRequest.getUser(), new JobDefinition.Builder().fromWithInstanceCountInheritance(jobDefinition, isQuickSubmit, num -> {
                return mantisJobMetadataImpl.getStageMetadata(num.intValue()).map((v0) -> {
                    return v0.getNumWorkers();
                });
            }).build(), getJobDefinitionUpdatedFromJobActorRequest.isAutoResubmit(), getJobDefinitionUpdatedFromJobActorRequest.isQuickSubmit(), getJobDefinitionUpdatedFromJobActorRequest.getOriginalSender());
        } catch (io.mantisrx.runtime.command.InvalidJobException e) {
            LOGGER.error("Failed to build job definition with inheritance:", e);
            return new JobClusterManagerProto.GetJobDefinitionUpdatedFromJobActorResponse(getJobDefinitionUpdatedFromJobActorRequest.requestId, BaseResponse.ResponseCode.SERVER_ERROR, e.getMessage(), getJobDefinitionUpdatedFromJobActorRequest.getUser(), null, getJobDefinitionUpdatedFromJobActorRequest.isAutoResubmit(), getJobDefinitionUpdatedFromJobActorRequest.isQuickSubmit(), getJobDefinitionUpdatedFromJobActorRequest.getOriginalSender());
        }
    }

    @Override // io.mantisrx.master.jobcluster.job.IMantisJobManager
    public void shutdown(JobState jobState, String str) {
        LOGGER.info("Entering JobActor:shutdown {}", this.jobId);
        this.workerManager.shutdown();
        this.eventPublisher.publishStatusEvent(new LifecycleEventsProto.JobStatusEvent(LifecycleEventsProto.StatusEvent.StatusEventType.INFO, "job shutdown, reason: " + str, getJobId(), jobState));
        this.eventPublisher.publishAuditEvent(new LifecycleEventsProto.AuditEvent(LifecycleEventsProto.AuditEvent.AuditEventType.JOB_TERMINATE, this.jobId.getId(), "job shutdown, reason: " + str));
        this.scalerRuleInfoBehaviorSubject.onCompleted();
    }

    @Override // io.mantisrx.master.jobcluster.job.IMantisJobManager
    public JobId getJobId() {
        return this.jobId;
    }

    private void updateStateAndPersist(JobState jobState) throws Exception {
        this.mantisJobMetaData.setJobState(jobState, this.jobStore);
    }

    private void setRuntimeLimitTimersIfRequired(Instant instant) {
        long runtimeLimitSecs = this.mantisJobMetaData.getJobDefinition().getJobSla().getRuntimeLimitSecs();
        Instant orElse = this.mantisJobMetaData.getStartedAtInstant().orElse(instant);
        if (runtimeLimitSecs <= 0) {
            LOGGER.info("maxRuntime for Job {} is  {} ignore ", this.jobId, Long.valueOf(this.mantisJobMetaData.getJobDefinition().getJobSla().getRuntimeLimitSecs()));
            return;
        }
        long calculateRuntimeDuration = JobHelper.calculateRuntimeDuration(runtimeLimitSecs, orElse);
        LOGGER.info("Will terminate Job {} at {} ", this.jobId, instant.plusSeconds(calculateRuntimeDuration));
        getTimers().startSingleTimer("RUNTIME_LIMIT", new JobProto.RuntimeLimitReached(), Duration.ofSeconds(calculateRuntimeDuration));
    }

    @Override // io.mantisrx.master.jobcluster.job.IMantisJobManager
    public JobState getJobState() {
        return this.mantisJobMetaData.getState();
    }

    private boolean isAutoscaled(SchedulingInfo schedulingInfo) {
        Iterator it = schedulingInfo.getStages().entrySet().iterator();
        while (it.hasNext()) {
            StageScalingPolicy scalingPolicy = ((StageSchedulingInfo) ((Map.Entry) it.next()).getValue()).getScalingPolicy();
            if (scalingPolicy != null && scalingPolicy.isEnabled()) {
                LOGGER.info("Job {} is autoscaleable", this.jobId);
                return true;
            }
        }
        LOGGER.info("Job {} is NOT scaleable", this.jobId);
        return false;
    }

    static long getSubscriptionTimeoutSecs(IMantisJobMetadata iMantisJobMetadata) {
        if (iMantisJobMetadata.getJobDefinition().getJobSla().getDurationType() == MantisJobDurationType.Perpetual) {
            return 0L;
        }
        return iMantisJobMetadata.getSubscriptionTimeoutSecs() == 0 ? ConfigurationProvider.getConfig().getEphemeralJobUnsubscribedTimeoutSecs() : iMantisJobMetadata.getSubscriptionTimeoutSecs();
    }

    static long getHeartbeatIntervalSecs(IMantisJobMetadata iMantisJobMetadata) {
        return iMantisJobMetadata.getHeartbeatIntervalSecs() > 0 ? iMantisJobMetadata.getHeartbeatIntervalSecs() : ConfigurationProvider.getConfig().getDefaultWorkerHeartbeatIntervalSecs();
    }

    private String getResourceCluster() {
        return (String) this.mantisJobMetaData.getJobDefinition().getResourceCluster().map((v0) -> {
            return v0.getResourceID();
        }).orElse("mesos");
    }
}
