package io.mantisrx.master.jobcluster;

import akka.actor.AbstractActor;
import akka.actor.AbstractActorWithTimers;
import akka.actor.ActorRef;
import akka.actor.Props;
import akka.actor.SupervisorStrategy;
import akka.actor.Terminated;
import akka.pattern.PatternsCS;
import com.netflix.fenzo.triggers.CronTrigger;
import com.netflix.fenzo.triggers.TriggerOperator;
import com.netflix.fenzo.triggers.exceptions.SchedulerException;
import com.netflix.fenzo.triggers.exceptions.TriggerNotFoundException;
import com.netflix.spectator.api.BasicTag;
import com.netflix.spectator.api.Tag;
import com.netflix.spectator.impl.Preconditions;
import io.mantisrx.common.Label;
import io.mantisrx.common.metrics.Counter;
import io.mantisrx.common.metrics.Metrics;
import io.mantisrx.common.metrics.MetricsRegistry;
import io.mantisrx.common.metrics.spectator.GaugeCallback;
import io.mantisrx.common.metrics.spectator.MetricGroupId;
import io.mantisrx.master.JobClustersManagerActor;
import io.mantisrx.master.StringConstants;
import io.mantisrx.master.akka.MantisActorSupervisorStrategy;
import io.mantisrx.master.api.akka.route.proto.JobClusterProtoAdapter;
import io.mantisrx.master.events.LifecycleEventPublisher;
import io.mantisrx.master.events.LifecycleEventsProto;
import io.mantisrx.master.jobcluster.JobClusterMetadataImpl;
import io.mantisrx.master.jobcluster.MantisJobClusterMetadataView;
import io.mantisrx.master.jobcluster.job.CostsCalculator;
import io.mantisrx.master.jobcluster.job.IMantisJobMetadata;
import io.mantisrx.master.jobcluster.job.JobActor;
import io.mantisrx.master.jobcluster.job.JobHelper;
import io.mantisrx.master.jobcluster.job.JobState;
import io.mantisrx.master.jobcluster.job.MantisJobMetadataImpl;
import io.mantisrx.master.jobcluster.job.MantisJobMetadataView;
import io.mantisrx.master.jobcluster.job.worker.IMantisWorkerMetadata;
import io.mantisrx.master.jobcluster.proto.BaseResponse;
import io.mantisrx.master.jobcluster.proto.JobClusterManagerProto;
import io.mantisrx.master.jobcluster.proto.JobClusterProto;
import io.mantisrx.master.jobcluster.proto.JobProto;
import io.mantisrx.runtime.JobConstraints;
import io.mantisrx.runtime.JobSla;
import io.mantisrx.runtime.command.InvalidJobException;
import io.mantisrx.runtime.descriptor.StageSchedulingInfo;
import io.mantisrx.server.core.JobCompletedReason;
import io.mantisrx.server.master.ConstraintsEvaluators;
import io.mantisrx.server.master.InvalidJobRequest;
import io.mantisrx.server.master.JobRequest;
import io.mantisrx.server.master.config.ConfigurationProvider;
import io.mantisrx.server.master.domain.IJobClusterDefinition;
import io.mantisrx.server.master.domain.JobClusterConfig;
import io.mantisrx.server.master.domain.JobClusterDefinitionImpl;
import io.mantisrx.server.master.domain.JobDefinition;
import io.mantisrx.server.master.domain.JobId;
import io.mantisrx.server.master.domain.SLA;
import io.mantisrx.server.master.persistence.MantisJobStore;
import io.mantisrx.server.master.persistence.exceptions.JobClusterAlreadyExistsException;
import io.mantisrx.server.master.scheduler.MantisSchedulerFactory;
import io.mantisrx.server.master.scheduler.WorkerEvent;
import io.mantisrx.shaded.com.google.common.base.Throwables;
import io.mantisrx.shaded.com.google.common.collect.Lists;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.functions.Action1;
import rx.schedulers.Schedulers;
import rx.subjects.BehaviorSubject;

/* loaded from: input_file:io/mantisrx/master/jobcluster/JobClusterActor.class */
public class JobClusterActor extends AbstractActorWithTimers implements IJobClusterManager {
    private static final int BOOKKEEPING_INTERVAL_SECS = 5;
    private static final String BOOKKEEPING_TIMER_KEY = "JOB_CLUSTER_BOOKKEEPING";
    private static final Integer DEFAULT_LIMIT = 100;
    private static final Integer DEFAULT_ACTIVE_JOB_LIMIT = Integer.valueOf(JobClustersManagerActor.STATE_TRANSITION_TIMEOUT_MSECS);
    private static final String CHECK_EXPIRED_TIMER_KEY = "EXPIRE_OLD_JOBS";
    private static final long EXPIRED_JOBS_CHECK_INTERVAL_SECS = 3600;
    private final Counter numJobSubmissions;
    private final Counter numJobShutdowns;
    private final Counter numJobActorCreationCounter;
    private final Counter numJobClustersInitialized;
    private final Counter numJobClusterInitializeFailures;
    private final Counter numJobsInitialized;
    private final Counter numJobSubmissionFailures;
    private final Counter numJobClusterEnable;
    private final Counter numJobClusterEnableErrors;
    private final Counter numJobClusterDisable;
    private final Counter numJobClusterDisableErrors;
    private final Counter numJobClusterDelete;
    private final Counter numJobClusterDeleteErrors;
    private final Counter numJobClusterUpdate;
    private final Counter numJobClusterUpdateErrors;
    private final Counter numSLAEnforcementExecutions;
    private final String name;
    private final MantisJobStore jobStore;
    private IJobClusterMetadata jobClusterMetadata;
    private CronManager cronManager;
    private SLAEnforcer slaEnforcer;
    private final JobManager jobManager;
    private final MantisSchedulerFactory mantisSchedulerFactory;
    private final LifecycleEventPublisher eventPublisher;
    private final Logger logger = LoggerFactory.getLogger(JobClusterActor.class);
    private final JobDefinitionResolver jobDefinitionResolver = new JobDefinitionResolver();
    private BehaviorSubject<JobId> jobIdSubmissionSubject = BehaviorSubject.create();
    private AbstractActor.Receive initializedBehavior = buildInitializedBehavior();
    private AbstractActor.Receive disabledBehavior = buildDisabledBehavior();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/mantisrx/master/jobcluster/JobClusterActor$CompletedJobCache.class */
    public static class CompletedJobCache {
        private final String name;
        private final LabelCache labelsCache;
        private final Logger logger = LoggerFactory.getLogger(CompletedJobCache.class);
        private final Set<JobClusterDefinitionImpl.CompletedJob> terminalSortedJobSet = new TreeSet((completedJob, completedJob2) -> {
            if (completedJob.getTerminatedAt() < completedJob2.getTerminatedAt()) {
                return 1;
            }
            return completedJob.getTerminatedAt() > completedJob2.getTerminatedAt() ? -1 : 0;
        });
        private final Map<JobId, JobClusterDefinitionImpl.CompletedJob> completedJobs = new HashMap();
        private final Map<JobId, IMantisJobMetadata> jobIdToMetadataMap = new HashMap();

        public CompletedJobCache(String str, LabelCache labelCache) {
            this.name = str;
            this.labelsCache = labelCache;
        }

        public Set<JobClusterDefinitionImpl.CompletedJob> getCompletedJobSortedSet() {
            return this.terminalSortedJobSet;
        }

        public Optional<JobClusterDefinitionImpl.CompletedJob> getCompletedJob(JobId jobId) {
            return Optional.ofNullable(this.completedJobs.getOrDefault(jobId, null));
        }

        public Optional<IMantisJobMetadata> getJobDataForCompletedJob(JobId jobId, MantisJobStore mantisJobStore) {
            return this.jobIdToMetadataMap.containsKey(jobId) ? Optional.of(this.jobIdToMetadataMap.get(jobId)) : mantisJobStore.getArchivedJob(jobId.getId());
        }

        public Set<JobId> getJobIdsMatchingLabels(List<Label> list, boolean z) {
            return this.labelsCache.getJobIdsMatchingLabels(list, z);
        }

        /* JADX WARN: Multi-variable type inference failed */
        public Optional<JobClusterDefinitionImpl.CompletedJob> markCompleted(JobId jobId, Optional<IMantisJobMetadata> optional, long j, long j2, String str, String str2, JobState jobState, MantisJobStore mantisJobStore) {
            if (this.completedJobs.containsKey(jobId)) {
                this.logger.warn("Job {}  already marked completed", jobId);
                return Optional.of(this.completedJobs.get(jobId));
            }
            List arrayList = new ArrayList();
            if (optional.isPresent()) {
                arrayList = optional.get().getLabels();
            }
            JobClusterDefinitionImpl.CompletedJob completedJob = new JobClusterDefinitionImpl.CompletedJob(this.name, jobId.getId(), str2, jobState, j, j2, str, arrayList);
            this.terminalSortedJobSet.add(completedJob);
            try {
                addToCacheAndSaveCompletedJobToStore(completedJob, optional, mantisJobStore);
            } catch (Exception e) {
                this.logger.warn("Unable to save {} to completed jobs table due to {}", new Object[]{completedJob, e.getMessage(), e});
            }
            return Optional.of(completedJob);
        }

        public void purgeOldCompletedJobs(long j, MantisJobStore mantisJobStore) {
            long j2 = 0;
            int maxJobsToPurge = ConfigurationProvider.getConfig().getMaxJobsToPurge();
            long nanoTime = System.nanoTime();
            Iterator<JobClusterDefinitionImpl.CompletedJob> it = this.completedJobs.values().iterator();
            while (true) {
                if (!it.hasNext()) {
                    break;
                }
                if (j2 == maxJobsToPurge) {
                    this.logger.info("{} Max clean up limit of {} reached. Stop clean up", this.name, Integer.valueOf(maxJobsToPurge));
                    break;
                }
                JobClusterDefinitionImpl.CompletedJob next = it.next();
                if (next.getTerminatedAt() < j) {
                    try {
                        this.logger.info("Purging Job {} as it was terminated at {} which is older than cutoff {}", new Object[]{next, Long.valueOf(next.getTerminatedAt()), Long.valueOf(j)});
                        this.terminalSortedJobSet.remove(next);
                        mantisJobStore.deleteJob(next.getJobId());
                        mantisJobStore.deleteCompletedJob(this.name, next.getJobId());
                        it.remove();
                        Optional<JobId> fromId = JobId.fromId(next.getJobId());
                        if (fromId.isPresent()) {
                            this.jobIdToMetadataMap.remove(fromId.get());
                            this.labelsCache.removeJobIdFromLabelCache(fromId.get());
                        }
                    } catch (Exception e) {
                        this.logger.warn("Unable to purge job {} due to {}", next, e);
                    }
                    j2++;
                } else if (this.logger.isDebugEnabled()) {
                    this.logger.debug("Job {} was terminated at {} which is not older than cutoff {}", new Object[]{next, Long.valueOf(next.getTerminatedAt()), Long.valueOf(j)});
                }
            }
            if (j2 > 0) {
                this.logger.info("Took {} micros to clean up {} jobs in cluster {} ", new Object[]{Long.valueOf((System.nanoTime() - nanoTime) / 1000), Long.valueOf(j2), this.name});
            }
        }

        void forcePurgeCompletedJobs(MantisJobStore mantisJobStore) {
            Iterator<JobClusterDefinitionImpl.CompletedJob> it = this.completedJobs.values().iterator();
            while (it.hasNext()) {
                JobClusterDefinitionImpl.CompletedJob next = it.next();
                try {
                    this.logger.info("Purging Job {} during job cluster cleanup", next);
                    this.terminalSortedJobSet.remove(next);
                    mantisJobStore.deleteJob(next.getJobId());
                    mantisJobStore.deleteCompletedJob(this.name, next.getJobId());
                    it.remove();
                    Optional<JobId> fromId = JobId.fromId(next.getJobId());
                    if (fromId.isPresent()) {
                        this.jobIdToMetadataMap.remove(fromId.get());
                        this.labelsCache.removeJobIdFromLabelCache(fromId.get());
                    }
                } catch (Exception e) {
                    this.logger.warn("Unable to purge job {} due to {}", next, e);
                }
            }
        }

        public void persistToCompletedJobAndArchiveJobTables(IMantisJobMetadata iMantisJobMetadata, MantisJobStore mantisJobStore) {
            try {
                addToCacheAndSaveCompletedJobToStore(new JobClusterDefinitionImpl.CompletedJob(this.name, iMantisJobMetadata.getJobId().getId(), null, iMantisJobMetadata.getState(), iMantisJobMetadata.getSubmittedAtInstant().toEpochMilli(), iMantisJobMetadata.getEndedAtInstant().orElse(Instant.now()).toEpochMilli(), iMantisJobMetadata.getUser(), iMantisJobMetadata.getLabels()), Optional.of(iMantisJobMetadata), mantisJobStore);
                mantisJobStore.archiveJob(iMantisJobMetadata);
            } catch (Exception e) {
                this.logger.warn("Unable to save completed job {} to store due to {}", iMantisJobMetadata, e);
            }
        }

        private void addToCacheAndSaveCompletedJobToStore(JobClusterDefinitionImpl.CompletedJob completedJob, Optional<IMantisJobMetadata> optional, MantisJobStore mantisJobStore) throws Exception {
            Optional<JobId> fromId = JobId.fromId(completedJob.getJobId());
            if (!fromId.isPresent()) {
                this.logger.warn("Invalid job id {} in addToCAcheAndSaveCompletedJobToStore ", completedJob);
                return;
            }
            this.labelsCache.addJobIdToLabelCache(fromId.get(), completedJob.getLabelList());
            this.completedJobs.put(fromId.get(), completedJob);
            this.terminalSortedJobSet.add(completedJob);
            if (optional.isPresent()) {
                this.jobIdToMetadataMap.put(fromId.get(), optional.get());
            }
            mantisJobStore.storeCompletedJobForCluster(this.name, completedJob);
        }

        public void addCompletedJobsToCache(List<JobClusterDefinitionImpl.CompletedJob> list) {
            if (list == null) {
                this.logger.warn("addCompletedJobsToCache called with null completedJobsList");
            } else {
                this.terminalSortedJobSet.addAll(list);
                list.forEach(completedJob -> {
                    Optional<JobId> fromId = JobId.fromId(completedJob.getJobId());
                    if (!fromId.isPresent()) {
                        this.logger.warn("Invalid job Id {}", completedJob.getJobId());
                    } else {
                        this.completedJobs.put(fromId.get(), completedJob);
                        this.labelsCache.addJobIdToLabelCache(fromId.get(), completedJob.getLabelList());
                    }
                });
            }
        }

        public boolean containsKey(JobId jobId) {
            return this.completedJobs.containsKey(jobId);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/mantisrx/master/jobcluster/JobClusterActor$CronManager.class */
    public static class CronManager {
        private final String cronSpec;
        private final IJobClusterDefinition.CronPolicy policy;
        private final ActorRef clusterActor;
        private String triggerId;
        private final String jobClusterName;
        private CronTrigger<ActorRef> scheduledTrigger;
        private static final Logger logger = LoggerFactory.getLogger(CronManager.class);
        private static final TriggerOperator triggerOperator = new TriggerOperator(1);
        private String triggerGroup = null;
        private boolean isCronActive = false;

        CronManager(String str, ActorRef actorRef, SLA sla) throws Exception {
            this.jobClusterName = str;
            this.cronSpec = sla.getCronSpec();
            this.policy = sla.getCronPolicy();
            this.clusterActor = actorRef;
            if (this.cronSpec != null) {
                initCron();
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void initCron() throws Exception {
            if (this.cronSpec == null || this.triggerId != null) {
                return;
            }
            logger.info("Init'ing cron for " + this.jobClusterName);
            this.triggerGroup = this.jobClusterName + "-" + this;
            try {
                this.scheduledTrigger = new CronTrigger<>(this.cronSpec, this.jobClusterName, this.clusterActor, ActorRef.class, CronTriggerAction.class);
                this.triggerId = triggerOperator.registerTrigger(this.triggerGroup, this.scheduledTrigger);
                this.isCronActive = true;
            } catch (IllegalArgumentException e) {
                destroyCron();
                logger.error("Failed to start cron for {}: {}", this.jobClusterName, e);
                throw new SchedulerException(e.getMessage(), e);
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void destroyCron() {
            try {
                if (this.triggerId != null) {
                    logger.info("Destroying cron " + this.triggerId);
                    this.triggerId = null;
                    this.isCronActive = false;
                    triggerOperator.deleteTrigger(this.triggerGroup, this.triggerId);
                }
            } catch (TriggerNotFoundException | SchedulerException e) {
                logger.warn("Couldn't delete trigger group " + this.triggerGroup + ", id " + this.triggerId);
            }
        }

        boolean isCronActive() {
            return this.isCronActive;
        }

        static {
            try {
                triggerOperator.initialize();
            } catch (SchedulerException e) {
                logger.error("Unexpected: {}", e.getMessage(), e);
                throw new RuntimeException((Throwable) e);
            }
        }
    }

    /* loaded from: input_file:io/mantisrx/master/jobcluster/JobClusterActor$CronTriggerAction.class */
    public static class CronTriggerAction implements Action1<ActorRef> {
        public void call(ActorRef actorRef) {
            actorRef.tell(new JobClusterProto.TriggerCronRequest(), ActorRef.noSender());
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/mantisrx/master/jobcluster/JobClusterActor$JobInfo.class */
    public static final class JobInfo {
        final long submittedAt;
        public String version;
        volatile long initializeInitiatedAt;
        volatile long initializedAt;
        volatile long terminationInitiatedAt;
        volatile long terminatedAt;
        final JobId jobId;
        final ActorRef jobActor;
        volatile JobState state;
        final String user;
        final JobDefinition jobDefinition;

        /* JADX INFO: Access modifiers changed from: package-private */
        /* loaded from: input_file:io/mantisrx/master/jobcluster/JobClusterActor$JobInfo$Builder.class */
        public static class Builder {
            long submittedAt = -1;
            long initializeInitiatedAt = -1;
            long initializedAt = -1;
            JobId jobId = null;
            ActorRef jobActor = null;
            JobState state = null;
            String user = "";
            JobDefinition jobDefinition = null;

            Builder() {
            }

            Builder withSubmittedAt(long j) {
                this.submittedAt = j;
                return this;
            }

            Builder withInitializeInitiatedAt(long j) {
                this.initializeInitiatedAt = j;
                return this;
            }

            Builder withInitializedAt(long j) {
                this.initializedAt = j;
                return this;
            }

            Builder withJobId(JobId jobId) {
                this.jobId = jobId;
                return this;
            }

            Builder withJobActor(ActorRef actorRef) {
                this.jobActor = actorRef;
                return this;
            }

            Builder withJobDefinition(JobDefinition jobDefinition) {
                this.jobDefinition = jobDefinition;
                return this;
            }

            Builder withUser(String str) {
                this.user = str;
                return this;
            }

            Builder withState(JobState jobState) {
                this.state = jobState;
                return this;
            }

            Builder usingJobMetadata(MantisJobMetadataImpl mantisJobMetadataImpl, ActorRef actorRef) {
                this.jobId = mantisJobMetadataImpl.getJobId();
                this.jobDefinition = mantisJobMetadataImpl.getJobDefinition();
                this.submittedAt = mantisJobMetadataImpl.getSubmittedAtInstant().toEpochMilli();
                this.state = mantisJobMetadataImpl.getState();
                this.user = mantisJobMetadataImpl.getUser();
                this.jobActor = actorRef;
                return this;
            }

            JobInfo build() {
                Preconditions.checkNotNull(this.jobId, "JobId cannot be null");
                Preconditions.checkNotNull(this.jobDefinition, "JobDefinition cannot be null");
                Preconditions.checkNotNull(this.state, "state cannot be null");
                Preconditions.checkNotNull(this.jobActor, "Job Actor cannot be null");
                return new JobInfo(this.jobId, this.jobDefinition, this.submittedAt, this.jobActor, this.state, this.user, this.initializeInitiatedAt, this.initializedAt);
            }
        }

        JobInfo(JobId jobId, JobDefinition jobDefinition, long j, ActorRef actorRef, JobState jobState, String str, long j2, long j3) {
            this.initializeInitiatedAt = -1L;
            this.initializedAt = -1L;
            this.terminationInitiatedAt = -1L;
            this.terminatedAt = -1L;
            this.submittedAt = j;
            this.jobActor = actorRef;
            this.jobId = jobId;
            this.state = jobState;
            this.user = str;
            this.jobDefinition = jobDefinition;
            this.initializeInitiatedAt = j2;
            this.initializedAt = j3;
        }

        public String toString() {
            return "JobInfo{submittedAt=" + this.submittedAt + ", initializeInitiatedAt=" + this.initializeInitiatedAt + ", initializedAt=" + this.initializedAt + ", terminationInitiatedAt=" + this.terminationInitiatedAt + ", terminatedAt=" + this.terminatedAt + ", jobId=" + this.jobId + ", jobActor=" + this.jobActor + ", state=" + this.state + ", user='" + this.user + "', jobDefinition=" + this.jobDefinition + '}';
        }

        void setInitializeInitiatedAt(long j) {
            this.initializeInitiatedAt = j;
        }

        void setInitializedAt(long j) {
            this.initializedAt = j;
        }

        void setState(JobState jobState) {
            this.state = jobState;
        }

        void setTerminationInitiatedAt(long j) {
            this.terminationInitiatedAt = j;
        }

        public void setTerminatedAt(long j) {
            this.terminatedAt = j;
        }

        JobInfo(JobId jobId, JobDefinition jobDefinition, long j, ActorRef actorRef, JobState jobState, String str) {
            this(jobId, jobDefinition, j, actorRef, jobState, str, -1L, -1L);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/mantisrx/master/jobcluster/JobClusterActor$JobManager.class */
    public static final class JobManager {
        private final String name;
        private final CompletedJobCache completedJobsCache;
        private final AbstractActor.ActorContext context;
        private final MantisSchedulerFactory scheduler;
        private final LifecycleEventPublisher publisher;
        private final MantisJobStore jobStore;
        private final CostsCalculator costsCalculator;
        private final Logger logger = LoggerFactory.getLogger(JobManager.class);
        private final Map<ActorRef, JobId> actorToJobIdMap = new HashMap();
        private final ConcurrentMap<JobId, JobInfo> pendingInitializationJobsMap = new ConcurrentHashMap();
        private final ConcurrentMap<JobId, JobInfo> activeJobsMap = new ConcurrentHashMap();
        private final ConcurrentMap<JobId, JobInfo> acceptedJobsMap = new ConcurrentHashMap();
        private final Set<JobInfo> nonTerminalSortedJobSet = new TreeSet((jobInfo, jobInfo2) -> {
            if (jobInfo.submittedAt < jobInfo2.submittedAt) {
                return 1;
            }
            return jobInfo.submittedAt > jobInfo2.submittedAt ? -1 : 0;
        });
        private final Map<JobId, JobInfo> terminatingJobsMap = new HashMap();
        private final LabelCache labelCache = new LabelCache();

        JobManager(String str, AbstractActor.ActorContext actorContext, MantisSchedulerFactory mantisSchedulerFactory, LifecycleEventPublisher lifecycleEventPublisher, MantisJobStore mantisJobStore, CostsCalculator costsCalculator) {
            this.name = str;
            this.jobStore = mantisJobStore;
            this.context = actorContext;
            this.scheduler = mantisSchedulerFactory;
            this.publisher = lifecycleEventPublisher;
            this.completedJobsCache = new CompletedJobCache(this.name, this.labelCache);
            this.costsCalculator = costsCalculator;
        }

        public void purgeOldCompletedJobs(long j) {
            this.completedJobsCache.purgeOldCompletedJobs(j, this.jobStore);
        }

        public void cleanupAllCompletedJobs() {
            this.completedJobsCache.forcePurgeCompletedJobs(this.jobStore);
        }

        Observable<JobProto.JobInitialized> bootstrapJob(MantisJobMetadataImpl mantisJobMetadataImpl, IJobClusterMetadata iJobClusterMetadata) {
            JobInfo createJobInfoAndActorAndWatchActor = createJobInfoAndActorAndWatchActor(mantisJobMetadataImpl, iJobClusterMetadata);
            this.actorToJobIdMap.put(createJobInfoAndActorAndWatchActor.jobActor, createJobInfoAndActorAndWatchActor.jobId);
            if (createJobInfoAndActorAndWatchActor.state.equals(JobState.Accepted)) {
                this.acceptedJobsMap.put(createJobInfoAndActorAndWatchActor.jobId, createJobInfoAndActorAndWatchActor);
                this.nonTerminalSortedJobSet.add(createJobInfoAndActorAndWatchActor);
            } else if (createJobInfoAndActorAndWatchActor.state.equals(JobState.Launched)) {
                this.activeJobsMap.put(createJobInfoAndActorAndWatchActor.jobId, createJobInfoAndActorAndWatchActor);
                this.nonTerminalSortedJobSet.add(createJobInfoAndActorAndWatchActor);
            } else if (createJobInfoAndActorAndWatchActor.state.equals(JobState.Terminating_abnormal) || createJobInfoAndActorAndWatchActor.state.equals(JobState.Terminating_normal)) {
                this.terminatingJobsMap.put(createJobInfoAndActorAndWatchActor.jobId, createJobInfoAndActorAndWatchActor);
                this.nonTerminalSortedJobSet.add(createJobInfoAndActorAndWatchActor);
            } else {
                this.logger.warn("Unexpected job state {}", createJobInfoAndActorAndWatchActor.state);
            }
            long masterInitTimeoutSecs = ConfigurationProvider.getConfig().getMasterInitTimeoutSecs();
            Duration ofSeconds = Duration.ofSeconds(masterInitTimeoutSecs - 60 > 0 ? masterInitTimeoutSecs - 60 : masterInitTimeoutSecs);
            markJobInitializeInitiated(createJobInfoAndActorAndWatchActor, System.currentTimeMillis());
            CompletionStage ask = PatternsCS.ask(createJobInfoAndActorAndWatchActor.jobActor, new JobProto.InitJob(ActorRef.noSender(), false), ofSeconds);
            Class<JobProto.JobInitialized> cls = JobProto.JobInitialized.class;
            JobProto.JobInitialized.class.getClass();
            return Observable.from(ask.thenApply(cls::cast).toCompletableFuture(), Schedulers.io()).onErrorResumeNext(th -> {
                this.logger.warn("caught exception {}", th.getMessage(), th);
                return Observable.just(new JobProto.JobInitialized(1L, BaseResponse.ResponseCode.SERVER_ERROR, "Timeout initializing Job " + createJobInfoAndActorAndWatchActor.jobId + " exception -> " + th.getMessage(), createJobInfoAndActorAndWatchActor.jobId, ActorRef.noSender()));
            }).map(jobInitialized -> {
                markJobInitialized(jobInitialized.jobId, System.currentTimeMillis());
                return jobInitialized;
            });
        }

        JobInfo initJob(MantisJobMetadataImpl mantisJobMetadataImpl, IJobClusterMetadata iJobClusterMetadata, ActorRef actorRef) {
            JobInfo createJobInfoAndActorAndWatchActor = createJobInfoAndActorAndWatchActor(mantisJobMetadataImpl, iJobClusterMetadata);
            markJobAccepted(createJobInfoAndActorAndWatchActor);
            createJobInfoAndActorAndWatchActor.jobActor.tell(new JobProto.InitJob(actorRef, true), this.context.self());
            markJobInitializeInitiated(createJobInfoAndActorAndWatchActor, System.currentTimeMillis());
            return createJobInfoAndActorAndWatchActor;
        }

        JobInfo createJobInfoAndActorAndWatchActor(MantisJobMetadataImpl mantisJobMetadataImpl, IJobClusterMetadata iJobClusterMetadata) {
            ActorRef actorOf = this.context.actorOf(JobActor.props(iJobClusterMetadata.getJobClusterDefinition(), mantisJobMetadataImpl, this.jobStore, this.scheduler.forJob(mantisJobMetadataImpl.getJobDefinition()), this.publisher, this.costsCalculator), "JobActor-" + mantisJobMetadataImpl.getJobId().getId());
            this.context.watch(actorOf);
            this.labelCache.addJobIdToLabelCache(mantisJobMetadataImpl.getJobId(), mantisJobMetadataImpl.getLabels());
            return new JobInfo.Builder().usingJobMetadata(mantisJobMetadataImpl, actorOf).build();
        }

        void markJobInitialized(JobId jobId, long j) {
            JobInfo remove = this.pendingInitializationJobsMap.remove(jobId);
            if (remove != null) {
                remove.setInitializedAt(j);
            }
        }

        void markJobInitializeInitiated(JobInfo jobInfo, long j) {
            jobInfo.setInitializeInitiatedAt(j);
            this.pendingInitializationJobsMap.put(jobInfo.jobId, jobInfo);
        }

        void persistToCompletedJobAndArchiveJobTables(IMantisJobMetadata iMantisJobMetadata) {
            this.completedJobsCache.persistToCompletedJobAndArchiveJobTables(iMantisJobMetadata, this.jobStore);
        }

        void addCompletedJobsToCache(List<JobClusterDefinitionImpl.CompletedJob> list) {
            this.completedJobsCache.addCompletedJobsToCache(list);
        }

        boolean markJobAccepted(JobInfo jobInfo) {
            boolean z = false;
            if (!jobInfo.state.isValidStateChgTo(JobState.Accepted) || this.activeJobsMap.containsKey(jobInfo.jobId) || this.terminatingJobsMap.containsKey(jobInfo.jobId) || this.completedJobsCache.containsKey(jobInfo.jobId)) {
                this.logger.warn(String.format("Job %s already exists", jobInfo.jobId));
            } else {
                this.acceptedJobsMap.put(jobInfo.jobId, jobInfo);
                this.actorToJobIdMap.put(jobInfo.jobActor, jobInfo.jobId);
                this.nonTerminalSortedJobSet.add(jobInfo);
                z = true;
            }
            return z;
        }

        List<JobInfo> getPendingInitializationJobsPriorToCutoff(long j) {
            return (List) this.pendingInitializationJobsMap.values().stream().filter(jobInfo -> {
                return jobInfo.initializedAt == -1 && jobInfo.initializeInitiatedAt < j;
            }).collect(Collectors.toList());
        }

        boolean markJobTerminating(JobInfo jobInfo, JobState jobState) {
            boolean z = false;
            if (JobState.isTerminalState(jobState) && jobInfo.state.isValidStateChgTo(jobState)) {
                this.activeJobsMap.remove(jobInfo.jobId);
                this.acceptedJobsMap.remove(jobInfo.jobId);
                this.nonTerminalSortedJobSet.add(jobInfo);
                jobInfo.setState(jobState);
                this.terminatingJobsMap.put(jobInfo.jobId, jobInfo);
                jobInfo.setTerminationInitiatedAt(System.currentTimeMillis());
                z = true;
            } else {
                this.logger.warn("Unexpected job terminating event " + jobInfo.jobId + " Invalid transition from state " + jobInfo.state + " to state " + jobState + " ");
            }
            return z;
        }

        boolean markJobStarted(JobInfo jobInfo) {
            boolean z = false;
            if (jobInfo.state.isValidStateChgTo(JobState.Launched)) {
                jobInfo.setState(JobState.Launched);
                this.acceptedJobsMap.remove(jobInfo.jobId);
                this.activeJobsMap.put(jobInfo.jobId, jobInfo);
                this.nonTerminalSortedJobSet.add(jobInfo);
                z = true;
            } else {
                this.logger.warn(String.format("Unexpected job started event %s Invalid transition from state %s to state %s", jobInfo.jobId, jobInfo.state, JobState.Launched));
            }
            return z;
        }

        Optional<JobClusterDefinitionImpl.CompletedJob> markCompleted(JobId jobId, Optional<IMantisJobMetadata> optional, JobState jobState) {
            return markCompleted(jobId, System.currentTimeMillis(), optional, jobState);
        }

        Optional<JobClusterDefinitionImpl.CompletedJob> markCompleted(JobId jobId, long j, Optional<IMantisJobMetadata> optional, JobState jobState) {
            if (this.logger.isTraceEnabled()) {
                this.logger.trace("Enter markCompleted job {}", jobId);
            }
            Optional<JobInfo> jobInfoForNonTerminalJob = getJobInfoForNonTerminalJob(jobId);
            if (!jobInfoForNonTerminalJob.isPresent()) {
                this.logger.warn("No such job {}", jobId);
                return Optional.empty();
            }
            JobInfo jobInfo = jobInfoForNonTerminalJob.get();
            jobInfo.state = jobState;
            jobInfo.setTerminatedAt(j);
            this.acceptedJobsMap.remove(jobId);
            this.terminatingJobsMap.remove(jobId);
            this.activeJobsMap.remove(jobId);
            this.actorToJobIdMap.remove(jobInfoForNonTerminalJob.get().jobActor);
            this.nonTerminalSortedJobSet.remove(jobInfo);
            if (this.logger.isTraceEnabled()) {
                this.logger.trace("Exit markCompleted job {}", jobId);
            }
            JobState jobState2 = JobState.Completed;
            String str = null;
            if (optional.isPresent()) {
                jobState2 = optional.get().getState();
                str = optional.get().getJobDefinition().getVersion();
            }
            return this.completedJobsCache.markCompleted(jobId, optional, jobInfo.submittedAt, j, jobInfo.user, str, jobState2, this.jobStore);
        }

        void markCompletedDuringStartup(JobId jobId, long j, IMantisJobMetadata iMantisJobMetadata, JobState jobState) {
            if (this.logger.isTraceEnabled()) {
                this.logger.trace("Enter markCompletedDuringStartup job {}", jobId);
            }
            this.completedJobsCache.markCompleted(jobId, Optional.of(iMantisJobMetadata), iMantisJobMetadata.getSubmittedAtInstant().toEpochMilli(), j, iMantisJobMetadata.getUser(), iMantisJobMetadata.getJobDefinition().getVersion(), JobState.isTerminalState(iMantisJobMetadata.getState()) ? iMantisJobMetadata.getState() : JobState.Completed, this.jobStore);
        }

        List<JobInfo> getAllNonTerminalJobsList() {
            ArrayList arrayList = new ArrayList(this.nonTerminalSortedJobSet);
            if (this.logger.isTraceEnabled()) {
                this.logger.trace("Exiting JobClusterActor:getAllNonTerminatlJobsList {}", arrayList);
            }
            return arrayList;
        }

        List<JobInfo> getAcceptedJobsList() {
            ArrayList newArrayListWithExpectedSize = Lists.newArrayListWithExpectedSize(acceptedJobsCount());
            newArrayListWithExpectedSize.addAll(this.acceptedJobsMap.values());
            return Collections.unmodifiableList(newArrayListWithExpectedSize);
        }

        List<JobInfo> getActiveJobsList() {
            ArrayList newArrayListWithExpectedSize = Lists.newArrayListWithExpectedSize(this.activeJobsMap.size());
            newArrayListWithExpectedSize.addAll(this.activeJobsMap.values());
            return Collections.unmodifiableList(newArrayListWithExpectedSize);
        }

        List<JobClusterDefinitionImpl.CompletedJob> getCompletedJobsList() {
            return new ArrayList(this.completedJobsCache.getCompletedJobSortedSet());
        }

        List<JobInfo> getTerminatingJobsList() {
            ArrayList newArrayListWithExpectedSize = Lists.newArrayListWithExpectedSize(this.terminatingJobsMap.size());
            newArrayListWithExpectedSize.addAll(this.terminatingJobsMap.values());
            return Collections.unmodifiableList(newArrayListWithExpectedSize);
        }

        int acceptedJobsCount() {
            return this.acceptedJobsMap.size();
        }

        int activeJobsCount() {
            return this.activeJobsMap.size();
        }

        Optional<JobClusterDefinitionImpl.CompletedJob> getCompletedJob(JobId jobId) {
            return this.completedJobsCache.getCompletedJob(jobId);
        }

        Optional<IMantisJobMetadata> getJobDataForCompletedJob(String str) {
            Optional<JobId> fromId = JobId.fromId(str);
            if (fromId.isPresent()) {
                return this.completedJobsCache.getJobDataForCompletedJob(fromId.get(), this.jobStore);
            }
            this.logger.warn("Invalid Job Id {} in getJobDataForCompletedJob", str);
            return Optional.empty();
        }

        Optional<JobInfo> getJobInfoForNonTerminalJob(JobId jobId) {
            if (this.logger.isTraceEnabled()) {
                this.logger.trace("In getJobInfo {}", jobId);
            }
            if (this.acceptedJobsMap.containsKey(jobId)) {
                if (this.logger.isDebugEnabled()) {
                    this.logger.debug("Found {} in accepted state", jobId);
                }
                return Optional.of(this.acceptedJobsMap.get(jobId));
            }
            if (this.activeJobsMap.containsKey(jobId)) {
                if (this.logger.isDebugEnabled()) {
                    this.logger.debug("Found {} in active state", jobId);
                }
                return Optional.of(this.activeJobsMap.get(jobId));
            }
            if (!this.terminatingJobsMap.containsKey(jobId)) {
                return Optional.empty();
            }
            if (this.logger.isDebugEnabled()) {
                this.logger.debug("Found {} in terminating state", jobId);
            }
            return Optional.of(this.terminatingJobsMap.get(jobId));
        }

        Optional<JobInfo> getJobInfoForNonTerminalJob(String str) {
            Optional<JobId> fromId = JobId.fromId(str);
            return fromId.isPresent() ? getJobInfoForNonTerminalJob(fromId.get()) : Optional.empty();
        }

        Optional<JobInfo> getJobInfoByUniqueId(String str) {
            return getAllNonTerminalJobsList().stream().filter(jobInfo -> {
                String userProvidedType = jobInfo.jobDefinition.getJobSla().getUserProvidedType();
                return (userProvidedType == null || userProvidedType.isEmpty() || !userProvidedType.equals(str)) ? false : true;
            }).findFirst();
        }

        /* JADX INFO: Access modifiers changed from: private */
        public List<JobInfo> getJobActorsStuckInInit(long j, long j2) {
            return (List) getPendingInitializationJobsPriorToCutoff(j - j2).stream().peek(jobInfo -> {
                this.logger.warn("Job {} waiting for initialization since {}", jobInfo.jobId, Long.valueOf(jobInfo.initializeInitiatedAt));
            }).collect(Collectors.toList());
        }

        /* JADX INFO: Access modifiers changed from: private */
        public List<JobInfo> getJobsStuckInAccepted(long j, long j2) {
            return (List) getAcceptedJobsList().stream().filter(jobInfo -> {
                return jobInfo.submittedAt < j - j2;
            }).peek(jobInfo2 -> {
                this.logger.warn("Job {} stuck in accepted since {}", jobInfo2.jobId, Instant.ofEpochMilli(jobInfo2.submittedAt));
            }).collect(Collectors.toList());
        }

        /* JADX INFO: Access modifiers changed from: private */
        public List<JobInfo> getJobsStuckInTerminating(long j, long j2) {
            return (List) getTerminatingJobsList().stream().filter(jobInfo -> {
                return jobInfo.terminationInitiatedAt < j - j2;
            }).peek(jobInfo2 -> {
                this.logger.warn("Job {} stuck in terminating since {}", jobInfo2.jobId, Instant.ofEpochMilli(jobInfo2.terminationInitiatedAt));
            }).collect(Collectors.toList());
        }

        boolean isJobListEmpty() {
            return this.activeJobsMap.isEmpty() && this.acceptedJobsMap.isEmpty();
        }

        public Set<JobId> getJobsMatchingLabels(List<Label> list, Optional<String> optional) {
            boolean z = false;
            if (optional.isPresent() && optional.get().equalsIgnoreCase("and")) {
                z = true;
            }
            return this.labelCache.getJobIdsMatchingLabels(list, z);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/mantisrx/master/jobcluster/JobClusterActor$LabelCache.class */
    public static final class LabelCache {
        final Map<Label, Set<JobId>> labelJobIdMap = new HashMap();
        final Map<JobId, List<Label>> jobIdToLabelMap = new HashMap();
        private final Logger logger = LoggerFactory.getLogger(LabelCache.class);

        LabelCache() {
        }

        void addJobIdToLabelCache(JobId jobId, List<Label> list) {
            if (this.logger.isTraceEnabled()) {
                this.logger.trace("addJobIdToLabelCache " + jobId + " labelList " + list + " current map " + this.labelJobIdMap);
            }
            if (list == null) {
                return;
            }
            for (Label label : list) {
                Set<JobId> set = this.labelJobIdMap.get(label);
                if (set != null) {
                    set.add(jobId);
                } else {
                    HashSet hashSet = new HashSet();
                    hashSet.add(jobId);
                    this.labelJobIdMap.put(label, hashSet);
                }
            }
            this.jobIdToLabelMap.put(jobId, list);
            if (this.logger.isTraceEnabled()) {
                this.logger.trace("Exit addJobIdToLabelCache " + jobId + " labelList " + list + " new map " + this.labelJobIdMap);
            }
        }

        void removeJobIdFromLabelCache(JobId jobId) {
            if (this.logger.isTraceEnabled()) {
                this.logger.trace("removeJobIdFromLabelCache " + jobId + " current map " + this.labelJobIdMap);
            }
            List<Label> list = this.jobIdToLabelMap.get(jobId);
            if (list != null) {
                for (Label label : list) {
                    Set<JobId> set = this.labelJobIdMap.get(label);
                    set.remove(jobId);
                    if (set.isEmpty()) {
                        this.labelJobIdMap.remove(label);
                    }
                }
            }
            this.jobIdToLabelMap.remove(jobId);
            if (this.logger.isTraceEnabled()) {
                this.logger.trace("Exit removeJobIdFromLabelCache " + jobId + " current map " + this.labelJobIdMap);
            }
        }

        Set<JobId> getJobIdsMatchingLabels(List<Label> list, boolean z) {
            if (this.logger.isTraceEnabled()) {
                this.logger.trace("Entering getJobidsMatchingLabels " + list + " is and ? " + z + " with map " + this.labelJobIdMap);
            }
            HashSet hashSet = new HashSet();
            ArrayList arrayList = new ArrayList();
            if (list == null) {
                return hashSet;
            }
            for (Label label : list) {
                if (this.labelJobIdMap.containsKey(label)) {
                    HashSet hashSet2 = new HashSet();
                    hashSet2.addAll(this.labelJobIdMap.get(label));
                    arrayList.add(hashSet2);
                } else {
                    arrayList.add(new HashSet());
                }
            }
            Set<JobId> setIntersection = z ? getSetIntersection(arrayList) : getSetUnion(arrayList);
            if (this.logger.isTraceEnabled()) {
                this.logger.trace("Exiting getJobidsMatchingLabels " + setIntersection);
            }
            return setIntersection;
        }

        private Set<JobId> getSetUnion(List<Set<JobId>> list) {
            if (this.logger.isTraceEnabled()) {
                this.logger.trace("In getSetUnion " + list);
            }
            HashSet hashSet = new HashSet();
            if (list == null || list.isEmpty()) {
                return hashSet;
            }
            int i = 0;
            Set<JobId> set = list.get(0);
            while (true) {
                i++;
                if (i >= list.size()) {
                    break;
                }
                set.addAll(list.get(i));
            }
            if (this.logger.isTraceEnabled()) {
                this.logger.trace("Exit  getSetUnion " + set);
            }
            return set;
        }

        private Set<JobId> getSetIntersection(List<Set<JobId>> list) {
            if (this.logger.isTraceEnabled()) {
                this.logger.trace("In getSetIntersection " + list);
            }
            HashSet hashSet = new HashSet();
            if (list == null || list.isEmpty()) {
                return hashSet;
            }
            int i = 0;
            Set<JobId> set = list.get(0);
            while (true) {
                i++;
                if (i >= list.size()) {
                    break;
                }
                set.retainAll(list.get(i));
            }
            if (this.logger.isTraceEnabled()) {
                this.logger.trace("Return getSetIntersection " + set);
            }
            return set;
        }
    }

    public static Props props(String str, MantisJobStore mantisJobStore, MantisSchedulerFactory mantisSchedulerFactory, LifecycleEventPublisher lifecycleEventPublisher, CostsCalculator costsCalculator) {
        return Props.create(JobClusterActor.class, new Object[]{str, mantisJobStore, mantisSchedulerFactory, lifecycleEventPublisher, costsCalculator});
    }

    public JobClusterActor(String str, MantisJobStore mantisJobStore, MantisSchedulerFactory mantisSchedulerFactory, LifecycleEventPublisher lifecycleEventPublisher, CostsCalculator costsCalculator) {
        this.name = str;
        this.jobStore = mantisJobStore;
        this.mantisSchedulerFactory = mantisSchedulerFactory;
        this.eventPublisher = lifecycleEventPublisher;
        this.jobManager = new JobManager(str, getContext(), this.mantisSchedulerFactory, lifecycleEventPublisher, mantisJobStore, costsCalculator);
        MetricGroupId metricGroupId = getMetricGroupId(str);
        Metrics registerAndGet = MetricsRegistry.getInstance().registerAndGet(new Metrics.Builder().id(metricGroupId).addCounter("numJobSubmissions").addCounter("numJobSubmissionFailures").addCounter("numJobShutdowns").addCounter("numJobActorCreationCounter").addCounter("numJobsInitialized").addCounter("numJobClustersInitialized").addCounter("numJobClusterInitializeFailures").addCounter("numJobClusterEnable").addCounter("numJobClusterEnableErrors").addCounter("numJobClusterDisable").addCounter("numJobClusterDisableErrors").addCounter("numJobClusterDelete").addCounter("numJobClusterDeleteErrors").addCounter("numJobClusterUpdate").addCounter("numJobClusterUpdateErrors").addCounter("numSLAEnforcementExecutions").addGauge(new GaugeCallback(metricGroupId, "acceptedJobsGauge", () -> {
            return Double.valueOf(1.0d * this.jobManager.acceptedJobsCount());
        })).addGauge(new GaugeCallback(metricGroupId, "activeJobsGauge", () -> {
            return Double.valueOf(1.0d * this.jobManager.activeJobsCount());
        })).addGauge(new GaugeCallback(metricGroupId, "terminatingJobsGauge", () -> {
            return Double.valueOf(1.0d * this.jobManager.terminatingJobsMap.size());
        })).addGauge(new GaugeCallback(metricGroupId, "completedJobsGauge", () -> {
            return Double.valueOf(1.0d * this.jobManager.completedJobsCache.completedJobs.size());
        })).addGauge(new GaugeCallback(metricGroupId, "actorToJobIdMappingsGauge", () -> {
            return Double.valueOf(1.0d * this.jobManager.actorToJobIdMap.size());
        })).build());
        this.numJobSubmissions = registerAndGet.getCounter("numJobSubmissions");
        this.numJobActorCreationCounter = registerAndGet.getCounter("numJobActorCreationCounter");
        this.numJobSubmissionFailures = registerAndGet.getCounter("numJobSubmissionFailures");
        this.numJobShutdowns = registerAndGet.getCounter("numJobShutdowns");
        this.numJobsInitialized = registerAndGet.getCounter("numJobsInitialized");
        this.numJobClustersInitialized = registerAndGet.getCounter("numJobClustersInitialized");
        this.numJobClusterInitializeFailures = registerAndGet.getCounter("numJobClusterInitializeFailures");
        this.numJobClusterEnable = registerAndGet.getCounter("numJobClusterEnable");
        this.numJobClusterDisable = registerAndGet.getCounter("numJobClusterDisable");
        this.numJobClusterDelete = registerAndGet.getCounter("numJobClusterDelete");
        this.numJobClusterUpdate = registerAndGet.getCounter("numJobClusterUpdate");
        this.numJobClusterEnableErrors = registerAndGet.getCounter("numJobClusterEnableErrors");
        this.numJobClusterDisableErrors = registerAndGet.getCounter("numJobClusterDisableErrors");
        this.numJobClusterDeleteErrors = registerAndGet.getCounter("numJobClusterDeleteErrors");
        this.numJobClusterUpdateErrors = registerAndGet.getCounter("numJobClusterUpdateErrors");
        this.numSLAEnforcementExecutions = registerAndGet.getCounter("numSLAEnforcementExecutions");
    }

    public AbstractActor.Receive createReceive() {
        return buildInitialBehavior();
    }

    private AbstractActor.Receive buildDisabledBehavior() {
        String str = "disabled";
        return receiveBuilder().match(JobClusterManagerProto.UpdateJobClusterRequest.class, this::onJobClusterUpdate).match(JobClusterManagerProto.UpdateJobClusterLabelsRequest.class, this::onJobClusterUpdateLabels).match(JobClusterManagerProto.UpdateJobClusterSLARequest.class, this::onJobClusterUpdateSLA).match(JobClusterManagerProto.UpdateJobClusterArtifactRequest.class, this::onJobClusterUpdateArtifact).match(JobClustersManagerActor.UpdateSchedulingInfo.class, this::onJobClusterUpdateSchedulingInfo).match(JobClusterManagerProto.UpdateJobClusterWorkerMigrationStrategyRequest.class, this::onJobClusterUpdateWorkerMigrationConfig).match(JobClusterManagerProto.GetJobClusterRequest.class, this::onJobClusterGet).match(JobClusterProto.DeleteJobClusterRequest.class, this::onJobClusterDelete).match(JobClusterManagerProto.ListArchivedWorkersRequest.class, this::onListArchivedWorkers).match(JobClusterManagerProto.ListCompletedJobsInClusterRequest.class, this::onJobListCompleted).match(JobClusterProto.KillJobResponse.class, this::onKillJobResponse).match(JobClusterManagerProto.GetJobDetailsRequest.class, this::onGetJobDetailsRequest).match(WorkerEvent.class, this::onWorkerEvent).match(JobClusterProto.ExpireOldJobsRequest.class, this::onExpireOldJobs).match(JobClusterManagerProto.EnableJobClusterRequest.class, this::onJobClusterEnable).match(Terminated.class, this::onTerminated).match(JobClusterManagerProto.SubmitJobRequest.class, submitJobRequest -> {
            getSender().tell(new JobClusterManagerProto.SubmitJobResponse(submitJobRequest.requestId, BaseResponse.ResponseCode.CLIENT_ERROR, genUnexpectedMsg(submitJobRequest.toString(), this.name, str), Optional.empty()), getSelf());
        }).match(JobClusterManagerProto.GetJobDefinitionUpdatedFromJobActorResponse.class, getJobDefinitionUpdatedFromJobActorResponse -> {
            getSender().tell(new JobClusterManagerProto.SubmitJobResponse(getJobDefinitionUpdatedFromJobActorResponse.requestId, BaseResponse.ResponseCode.CLIENT_ERROR, genUnexpectedMsg(getJobDefinitionUpdatedFromJobActorResponse.toString(), this.name, str), Optional.empty()), getSelf());
        }).match(JobClusterManagerProto.ResubmitWorkerRequest.class, resubmitWorkerRequest -> {
            getSender().tell(new JobClusterManagerProto.ResubmitWorkerResponse(resubmitWorkerRequest.requestId, BaseResponse.ResponseCode.CLIENT_ERROR, genUnexpectedMsg(resubmitWorkerRequest.toString(), this.name, str)), getSelf());
        }).match(JobProto.JobInitialized.class, jobInitialized -> {
            this.logger.warn(genUnexpectedMsg(jobInitialized.toString(), this.name, str));
        }).match(JobClusterProto.JobStartedEvent.class, jobStartedEvent -> {
            this.logger.warn(genUnexpectedMsg(jobStartedEvent.toString(), this.name, str));
        }).match(JobClusterManagerProto.ScaleStageRequest.class, scaleStageRequest -> {
            getSender().tell(new JobClusterManagerProto.ScaleStageResponse(scaleStageRequest.requestId, BaseResponse.ResponseCode.CLIENT_ERROR, genUnexpectedMsg(scaleStageRequest.toString(), this.name, str), 0), getSelf());
        }).match(JobClusterProto.KillJobRequest.class, killJobRequest -> {
            killJobRequest.requestor.tell(new JobClusterManagerProto.KillJobResponse(killJobRequest.requestId, BaseResponse.ResponseCode.CLIENT_ERROR, JobState.Noop, genUnexpectedMsg(killJobRequest.toString(), this.name, str), killJobRequest.jobId, killJobRequest.user), getSelf());
        }).match(JobClusterManagerProto.GetJobDetailsRequest.class, getJobDetailsRequest -> {
            getSender().tell(new JobClusterManagerProto.GetJobDetailsResponse(getJobDetailsRequest.requestId, BaseResponse.ResponseCode.CLIENT_ERROR, genUnexpectedMsg(getJobDetailsRequest.toString(), this.name, str), Optional.empty()), getSelf());
        }).match(JobClusterManagerProto.GetJobSchedInfoRequest.class, getJobSchedInfoRequest -> {
            getSender().tell(new JobClusterManagerProto.GetJobSchedInfoResponse(getJobSchedInfoRequest.requestId, BaseResponse.ResponseCode.CLIENT_ERROR, genUnexpectedMsg(getJobSchedInfoRequest.toString(), this.name, str), Optional.empty()), getSelf());
        }).match(JobClusterManagerProto.GetLatestJobDiscoveryInfoRequest.class, getLatestJobDiscoveryInfoRequest -> {
            getSender().tell(new JobClusterManagerProto.GetLatestJobDiscoveryInfoResponse(getLatestJobDiscoveryInfoRequest.requestId, BaseResponse.ResponseCode.CLIENT_ERROR, genUnexpectedMsg(getLatestJobDiscoveryInfoRequest.toString(), this.name, str), Optional.empty()), getSelf());
        }).match(JobClusterManagerProto.GetLastSubmittedJobIdStreamRequest.class, getLastSubmittedJobIdStreamRequest -> {
            getSender().tell(new JobClusterManagerProto.GetLastSubmittedJobIdStreamResponse(getLastSubmittedJobIdStreamRequest.requestId, BaseResponse.ResponseCode.CLIENT_ERROR, genUnexpectedMsg(getLastSubmittedJobIdStreamRequest.toString(), this.name, str), Optional.empty()), getSelf());
        }).match(JobClusterManagerProto.ListJobIdsRequest.class, listJobIdsRequest -> {
            getSender().tell(new JobClusterManagerProto.ListJobIdsResponse(listJobIdsRequest.requestId, BaseResponse.ResponseCode.CLIENT_ERROR, genUnexpectedMsg(listJobIdsRequest.toString(), this.name, str), new ArrayList()), getSelf());
        }).match(JobClusterManagerProto.ListJobsRequest.class, listJobsRequest -> {
            getSender().tell(new JobClusterManagerProto.ListJobsResponse(listJobsRequest.requestId, BaseResponse.ResponseCode.CLIENT_ERROR, genUnexpectedMsg(listJobsRequest.toString(), this.name, str), new ArrayList()), getSelf());
        }).match(JobClusterManagerProto.ListWorkersRequest.class, listWorkersRequest -> {
            getSender().tell(new JobClusterManagerProto.ListWorkersResponse(listWorkersRequest.requestId, BaseResponse.ResponseCode.CLIENT_ERROR, genUnexpectedMsg(listWorkersRequest.toString(), this.name, str), new ArrayList()), getSelf());
        }).match(JobClusterProto.EnforceSLARequest.class, enforceSLARequest -> {
            this.logger.warn(genUnexpectedMsg(enforceSLARequest.toString(), this.name, str));
        }).match(JobClusterProto.TriggerCronRequest.class, triggerCronRequest -> {
            this.logger.warn(genUnexpectedMsg(triggerCronRequest.toString(), this.name, str));
        }).match(JobClusterManagerProto.DisableJobClusterRequest.class, disableJobClusterRequest -> {
            getSender().tell(new JobClusterManagerProto.DisableJobClusterResponse(disableJobClusterRequest.requestId, BaseResponse.ResponseCode.SUCCESS, "Cluster is already disabled"), getSelf());
        }).match(Terminated.class, this::onTerminated).match(JobClusterProto.InitializeJobClusterRequest.class, initializeJobClusterRequest -> {
            getSender().tell(new JobClusterManagerProto.JobClustersManagerInitializeResponse(initializeJobClusterRequest.requestId, BaseResponse.ResponseCode.SUCCESS, "Cluster is already initialized"), getSelf());
        }).matchAny(obj -> {
            this.logger.warn("unexpected message '{}' received by JobCluster actor {} in Disabled State", obj, this.name);
        }).build();
    }

    private String genUnexpectedMsg(String str, String str2, String str3) {
        return String.format("Unexpected message %s received by JobCluster actor %s in %s State", str, str2, str3);
    }

    private AbstractActor.Receive buildInitialBehavior() {
        String str = "Uninited";
        return receiveBuilder().match(JobClusterProto.InitializeJobClusterRequest.class, this::onJobClusterInitialize).match(JobClusterManagerProto.UpdateJobClusterRequest.class, updateJobClusterRequest -> {
            getSender().tell(new JobClusterManagerProto.UpdateJobClusterResponse(updateJobClusterRequest.requestId, BaseResponse.ResponseCode.CLIENT_ERROR, genUnexpectedMsg(updateJobClusterRequest.toString(), this.name, str)), getSelf());
        }).match(JobClusterManagerProto.UpdateJobClusterLabelsRequest.class, updateJobClusterLabelsRequest -> {
            getSender().tell(new JobClusterManagerProto.UpdateJobClusterLabelsResponse(updateJobClusterLabelsRequest.requestId, BaseResponse.ResponseCode.CLIENT_ERROR, genUnexpectedMsg(updateJobClusterLabelsRequest.toString(), this.name, str)), getSelf());
        }).match(JobClusterManagerProto.UpdateJobClusterSLARequest.class, updateJobClusterSLARequest -> {
            getSender().tell(new JobClusterManagerProto.UpdateJobClusterSLAResponse(updateJobClusterSLARequest.requestId, BaseResponse.ResponseCode.CLIENT_ERROR, genUnexpectedMsg(updateJobClusterSLARequest.toString(), this.name, str)), getSelf());
        }).match(JobClusterManagerProto.UpdateJobClusterArtifactRequest.class, updateJobClusterArtifactRequest -> {
            getSender().tell(new JobClusterManagerProto.UpdateJobClusterArtifactResponse(updateJobClusterArtifactRequest.requestId, BaseResponse.ResponseCode.CLIENT_ERROR, genUnexpectedMsg(updateJobClusterArtifactRequest.toString(), this.name, str)), getSelf());
        }).match(JobClustersManagerActor.UpdateSchedulingInfo.class, updateSchedulingInfo -> {
            getSender().tell(new JobClusterManagerProto.UpdateSchedulingInfoResponse(updateSchedulingInfo.getRequestId(), BaseResponse.ResponseCode.CLIENT_ERROR, genUnexpectedMsg(updateSchedulingInfo.toString(), this.name, str)), getSelf());
        }).match(JobClusterManagerProto.UpdateJobClusterWorkerMigrationStrategyRequest.class, updateJobClusterWorkerMigrationStrategyRequest -> {
            getSender().tell(new JobClusterManagerProto.UpdateJobClusterWorkerMigrationStrategyResponse(updateJobClusterWorkerMigrationStrategyRequest.requestId, BaseResponse.ResponseCode.CLIENT_ERROR, genUnexpectedMsg(updateJobClusterWorkerMigrationStrategyRequest.toString(), this.name, str)), getSelf());
        }).match(JobClusterManagerProto.GetJobClusterRequest.class, getJobClusterRequest -> {
            getSender().tell(new JobClusterManagerProto.GetJobClusterResponse(getJobClusterRequest.requestId, BaseResponse.ResponseCode.CLIENT_ERROR, genUnexpectedMsg(getJobClusterRequest.toString(), this.name, str), Optional.empty()), getSelf());
        }).match(JobClusterProto.DeleteJobClusterRequest.class, deleteJobClusterRequest -> {
            getSender().tell(new JobClusterManagerProto.DeleteJobClusterResponse(deleteJobClusterRequest.requestId, BaseResponse.ResponseCode.CLIENT_ERROR, genUnexpectedMsg(deleteJobClusterRequest.toString(), this.name, str)), getSelf());
        }).match(JobClusterManagerProto.ListArchivedWorkersRequest.class, listArchivedWorkersRequest -> {
            getSender().tell(new JobClusterManagerProto.ListArchivedWorkersResponse(listArchivedWorkersRequest.requestId, BaseResponse.ResponseCode.CLIENT_ERROR, genUnexpectedMsg(listArchivedWorkersRequest.toString(), this.name, str), Lists.newArrayList()), getSelf());
        }).match(JobClusterManagerProto.ListCompletedJobsInClusterRequest.class, listCompletedJobsInClusterRequest -> {
            getSender().tell(new JobClusterManagerProto.ListCompletedJobsInClusterResponse(listCompletedJobsInClusterRequest.requestId, BaseResponse.ResponseCode.CLIENT_ERROR, genUnexpectedMsg(listCompletedJobsInClusterRequest.toString(), this.name, str), Lists.newArrayList()), getSelf());
        }).match(JobClusterProto.KillJobResponse.class, killJobResponse -> {
            this.logger.warn(genUnexpectedMsg(killJobResponse.toString(), this.name, str));
        }).match(JobClusterManagerProto.GetJobDetailsRequest.class, getJobDetailsRequest -> {
            getSender().tell(new JobClusterManagerProto.GetJobDetailsResponse(getJobDetailsRequest.requestId, BaseResponse.ResponseCode.CLIENT_ERROR, genUnexpectedMsg(getJobDetailsRequest.toString(), this.name, str), Optional.empty()), getSelf());
        }).match(WorkerEvent.class, workerEvent -> {
            this.logger.warn(genUnexpectedMsg(workerEvent.toString(), this.name, str));
        }).match(JobClusterProto.ExpireOldJobsRequest.class, expireOldJobsRequest -> {
            this.logger.warn(genUnexpectedMsg(expireOldJobsRequest.toString(), this.name, str));
        }).match(JobClusterManagerProto.EnableJobClusterRequest.class, enableJobClusterRequest -> {
            getSender().tell(new JobClusterManagerProto.EnableJobClusterResponse(enableJobClusterRequest.requestId, BaseResponse.ResponseCode.CLIENT_ERROR, genUnexpectedMsg(enableJobClusterRequest.toString(), this.name, str)), getSelf());
        }).match(JobClusterManagerProto.SubmitJobRequest.class, submitJobRequest -> {
            getSender().tell(new JobClusterManagerProto.SubmitJobResponse(submitJobRequest.requestId, BaseResponse.ResponseCode.CLIENT_ERROR, genUnexpectedMsg(submitJobRequest.toString(), this.name, str), Optional.empty()), getSelf());
        }).match(JobClusterManagerProto.GetJobDefinitionUpdatedFromJobActorResponse.class, getJobDefinitionUpdatedFromJobActorResponse -> {
            getSender().tell(new JobClusterManagerProto.SubmitJobResponse(getJobDefinitionUpdatedFromJobActorResponse.requestId, BaseResponse.ResponseCode.CLIENT_ERROR, genUnexpectedMsg(getJobDefinitionUpdatedFromJobActorResponse.toString(), this.name, str), Optional.empty()), getSelf());
        }).match(JobClusterManagerProto.ResubmitWorkerRequest.class, resubmitWorkerRequest -> {
            getSender().tell(new JobClusterManagerProto.ResubmitWorkerResponse(resubmitWorkerRequest.requestId, BaseResponse.ResponseCode.CLIENT_ERROR, genUnexpectedMsg(resubmitWorkerRequest.toString(), this.name, str)), getSelf());
        }).match(JobProto.JobInitialized.class, jobInitialized -> {
            this.logger.warn(genUnexpectedMsg(jobInitialized.toString(), this.name, str));
        }).match(JobClusterProto.JobStartedEvent.class, jobStartedEvent -> {
            this.logger.warn(genUnexpectedMsg(jobStartedEvent.toString(), this.name, str));
        }).match(JobClusterManagerProto.ScaleStageRequest.class, scaleStageRequest -> {
            getSender().tell(new JobClusterManagerProto.ScaleStageResponse(scaleStageRequest.requestId, BaseResponse.ResponseCode.CLIENT_ERROR, genUnexpectedMsg(scaleStageRequest.toString(), this.name, str), 0), getSelf());
        }).match(JobClusterProto.KillJobRequest.class, killJobRequest -> {
            getSender().tell(new JobClusterManagerProto.KillJobResponse(killJobRequest.requestId, BaseResponse.ResponseCode.CLIENT_ERROR, JobState.Noop, genUnexpectedMsg(killJobRequest.toString(), this.name, str), killJobRequest.jobId, killJobRequest.user), getSelf());
        }).match(JobClusterManagerProto.GetJobSchedInfoRequest.class, getJobSchedInfoRequest -> {
            getSender().tell(new JobClusterManagerProto.GetJobSchedInfoResponse(getJobSchedInfoRequest.requestId, BaseResponse.ResponseCode.CLIENT_ERROR, genUnexpectedMsg(getJobSchedInfoRequest.toString(), this.name, str), Optional.empty()), getSelf());
        }).match(JobClusterManagerProto.GetLatestJobDiscoveryInfoRequest.class, getLatestJobDiscoveryInfoRequest -> {
            getSender().tell(new JobClusterManagerProto.GetLatestJobDiscoveryInfoResponse(getLatestJobDiscoveryInfoRequest.requestId, BaseResponse.ResponseCode.CLIENT_ERROR, genUnexpectedMsg(getLatestJobDiscoveryInfoRequest.toString(), this.name, str), Optional.empty()), getSelf());
        }).match(JobClusterManagerProto.GetLastSubmittedJobIdStreamRequest.class, getLastSubmittedJobIdStreamRequest -> {
            getSender().tell(new JobClusterManagerProto.GetLastSubmittedJobIdStreamResponse(getLastSubmittedJobIdStreamRequest.requestId, BaseResponse.ResponseCode.CLIENT_ERROR, genUnexpectedMsg(getLastSubmittedJobIdStreamRequest.toString(), this.name, str), Optional.empty()), getSelf());
        }).match(JobClusterManagerProto.ListJobIdsRequest.class, listJobIdsRequest -> {
            getSender().tell(new JobClusterManagerProto.ListJobIdsResponse(listJobIdsRequest.requestId, BaseResponse.ResponseCode.CLIENT_ERROR, genUnexpectedMsg(listJobIdsRequest.toString(), this.name, str), Lists.newArrayList()), getSelf());
        }).match(JobClusterManagerProto.ListJobsRequest.class, listJobsRequest -> {
            getSender().tell(new JobClusterManagerProto.ListJobsResponse(listJobsRequest.requestId, BaseResponse.ResponseCode.CLIENT_ERROR, genUnexpectedMsg(listJobsRequest.toString(), this.name, str), Lists.newArrayList()), getSelf());
        }).match(JobClusterManagerProto.ListWorkersRequest.class, listWorkersRequest -> {
            getSender().tell(new JobClusterManagerProto.ListWorkersResponse(listWorkersRequest.requestId, BaseResponse.ResponseCode.CLIENT_ERROR, genUnexpectedMsg(listWorkersRequest.toString(), this.name, str), Lists.newArrayList()), getSelf());
        }).match(JobClusterProto.EnforceSLARequest.class, enforceSLARequest -> {
            this.logger.warn(genUnexpectedMsg(enforceSLARequest.toString(), this.name, str));
        }).match(JobClusterProto.ExpireOldJobsRequest.class, expireOldJobsRequest2 -> {
            this.logger.warn(genUnexpectedMsg(expireOldJobsRequest2.toString(), this.name, str));
        }).match(JobClusterProto.TriggerCronRequest.class, triggerCronRequest -> {
            this.logger.warn(genUnexpectedMsg(triggerCronRequest.toString(), this.name, str));
        }).match(JobClusterManagerProto.DisableJobClusterRequest.class, disableJobClusterRequest -> {
            getSender().tell(new JobClusterManagerProto.DisableJobClusterResponse(disableJobClusterRequest.requestId, BaseResponse.ResponseCode.CLIENT_ERROR, genUnexpectedMsg(disableJobClusterRequest.toString(), this.name, str)), getSelf());
        }).match(Terminated.class, this::onTerminated).matchAny(obj -> {
            this.logger.warn("unexpected message '{}' received by JobCluster actor {} in Uninited State", obj, this.name);
        }).build();
    }

    private AbstractActor.Receive buildInitializedBehavior() {
        String str = "Initialized";
        return receiveBuilder().match(JobClusterManagerProto.UpdateJobClusterRequest.class, this::onJobClusterUpdate).match(JobClusterManagerProto.UpdateJobClusterLabelsRequest.class, this::onJobClusterUpdateLabels).match(JobClusterManagerProto.UpdateJobClusterSLARequest.class, this::onJobClusterUpdateSLA).match(JobClusterManagerProto.UpdateJobClusterArtifactRequest.class, this::onJobClusterUpdateArtifact).match(JobClustersManagerActor.UpdateSchedulingInfo.class, this::onJobClusterUpdateSchedulingInfo).match(JobClusterManagerProto.UpdateJobClusterWorkerMigrationStrategyRequest.class, this::onJobClusterUpdateWorkerMigrationConfig).match(JobClusterManagerProto.EnableJobClusterRequest.class, enableJobClusterRequest -> {
            getSender().tell(new JobClusterManagerProto.EnableJobClusterResponse(enableJobClusterRequest.requestId, BaseResponse.ResponseCode.SUCCESS, genUnexpectedMsg(enableJobClusterRequest.toString(), this.name, str)), getSelf());
        }).match(JobClusterManagerProto.GetJobClusterRequest.class, this::onJobClusterGet).match(JobClusterProto.DeleteJobClusterRequest.class, this::onJobClusterDelete).match(JobClusterManagerProto.ListArchivedWorkersRequest.class, this::onListArchivedWorkers).match(JobClusterManagerProto.ListCompletedJobsInClusterRequest.class, this::onJobListCompleted).match(JobClusterProto.KillJobResponse.class, this::onKillJobResponse).match(JobClusterProto.ExpireOldJobsRequest.class, this::onExpireOldJobs).match(WorkerEvent.class, this::onWorkerEvent).match(JobClusterManagerProto.DisableJobClusterRequest.class, this::onJobClusterDisable).match(JobClusterProto.EnforceSLARequest.class, this::onEnforceSLARequest).match(JobClusterProto.BookkeepingRequest.class, this::onBookkeepingRequest).match(JobClusterProto.TriggerCronRequest.class, this::onTriggerCron).match(JobClusterManagerProto.ListJobIdsRequest.class, this::onJobIdList).match(JobClusterManagerProto.ListJobsRequest.class, this::onJobList).match(JobClusterManagerProto.ListWorkersRequest.class, this::onListActiveWorkers).match(JobClusterManagerProto.SubmitJobRequest.class, this::onJobSubmit).match(JobClusterManagerProto.GetJobDefinitionUpdatedFromJobActorResponse.class, this::onGetJobDefinitionUpdatedFromJobActorResponse).match(JobClusterManagerProto.GetJobDetailsRequest.class, this::onGetJobDetailsRequest).match(JobClusterManagerProto.GetJobSchedInfoRequest.class, this::onGetJobStatusSubject).match(JobClusterManagerProto.GetLatestJobDiscoveryInfoRequest.class, this::onGetLatestJobDiscoveryInfo).match(JobClusterProto.KillJobRequest.class, this::onJobKillRequest).match(JobClusterManagerProto.ResubmitWorkerRequest.class, this::onResubmitWorkerRequest).match(JobProto.JobInitialized.class, this::onJobInitialized).match(JobClusterProto.JobStartedEvent.class, this::onJobStarted).match(JobClusterManagerProto.GetLastSubmittedJobIdStreamRequest.class, this::onGetLastSubmittedJobIdSubject).match(JobClusterManagerProto.ScaleStageRequest.class, this::onScaleStage).match(JobClusterProto.InitializeJobClusterRequest.class, initializeJobClusterRequest -> {
            getSender().tell(new JobClusterManagerProto.JobClustersManagerInitializeResponse(initializeJobClusterRequest.requestId, BaseResponse.ResponseCode.SUCCESS, "Cluster is already initialized"), getSelf());
        }).match(Terminated.class, this::onTerminated).matchAny(obj -> {
            this.logger.info("unexpected message '{}' received by JobCluster actor {} in Initialized State.from class {}", new Object[]{obj, this.name, obj.getClass().getCanonicalName()});
        }).build();
    }

    MetricGroupId getMetricGroupId(String str) {
        return new MetricGroupId("JobClusterActor", new Tag[]{new BasicTag("jobCluster", str)});
    }

    public void preStart() throws Exception {
        this.logger.info("JobClusterActor {} started", this.name);
        super.preStart();
    }

    public void postStop() throws Exception {
        this.logger.info("JobClusterActor {} stopped", this.name);
        super.postStop();
        if (this.name != null) {
            MetricsRegistry.getInstance().remove(getMetricGroupId(this.name));
        }
    }

    public void preRestart(Throwable th, Optional<Object> optional) throws Exception {
        this.logger.info("{} preRestart {} (exc: {})", new Object[]{this.name, optional, th.getMessage()});
    }

    public void postRestart(Throwable th) throws Exception {
        this.logger.info("{} postRestart (exc={})", this.name, th.getMessage());
        super.postRestart(th);
    }

    public SupervisorStrategy supervisorStrategy() {
        return MantisActorSupervisorStrategy.getInstance().create();
    }

    private void setBookkeepingTimer(long j) {
        getTimers().startPeriodicTimer(BOOKKEEPING_TIMER_KEY, new JobClusterProto.BookkeepingRequest(), Duration.ofSeconds(j));
    }

    private void setExpiredJobsTimer(long j) {
        getTimers().startPeriodicTimer(CHECK_EXPIRED_TIMER_KEY, new JobClusterProto.ExpireOldJobsRequest(), Duration.ofSeconds(j));
    }

    @Override // io.mantisrx.master.jobcluster.IJobClusterManager
    public void onJobClusterInitialize(JobClusterProto.InitializeJobClusterRequest initializeJobClusterRequest) {
        ActorRef sender = getSender();
        this.logger.info("In onJobClusterInitialize {}", this.name);
        if (this.logger.isDebugEnabled()) {
            this.logger.debug("Init Request {}", initializeJobClusterRequest);
        }
        this.jobClusterMetadata = new JobClusterMetadataImpl.Builder().withLastJobCount(initializeJobClusterRequest.lastJobNumber).withIsDisabled(initializeJobClusterRequest.isDisabled).withJobClusterDefinition(initializeJobClusterRequest.jobClusterDefinition).build();
        this.slaEnforcer = new SLAEnforcer(this.jobClusterMetadata.getJobClusterDefinition().getSLA());
        long completedJobPurgeFrequencySeqs = ConfigurationProvider.getConfig().getCompletedJobPurgeFrequencySeqs();
        if (this.jobClusterMetadata.isDisabled()) {
            this.logger.info("Cluster {} initialized but is Disabled", this.jobClusterMetadata.getJobClusterDefinition().getName());
            this.jobManager.addCompletedJobsToCache(initializeJobClusterRequest.completedJobsList);
            int i = 50;
            if (!initializeJobClusterRequest.jobList.isEmpty()) {
                this.logger.info("Cluster {} is disabled however it has {} active/accepted jobs", this.jobClusterMetadata.getJobClusterDefinition().getName(), Integer.valueOf(initializeJobClusterRequest.jobList.size()));
                for (IMantisJobMetadata iMantisJobMetadata : initializeJobClusterRequest.jobList) {
                    if (i == 0) {
                        this.logger.info("Max cleanup limit of 50 reached abort");
                        break;
                    }
                    try {
                        if (!JobState.isTerminalState(iMantisJobMetadata.getState())) {
                            this.logger.info("Job {} is in non terminal state {} for disabled cluster {}.Marking it complete", new Object[]{iMantisJobMetadata.getJobId(), iMantisJobMetadata.getState(), this.jobClusterMetadata.getJobClusterDefinition().getName()});
                            i--;
                            this.jobManager.markCompletedDuringStartup(iMantisJobMetadata.getJobId(), System.currentTimeMillis(), iMantisJobMetadata, JobState.Completed);
                            this.jobStore.archiveJob(iMantisJobMetadata);
                        }
                    } catch (Exception e) {
                        this.logger.error("Exception {} archiving job {} during init ", new Object[]{e.getMessage(), iMantisJobMetadata.getJobId(), e});
                    }
                    this.logger.error("Exception {} archiving job {} during init ", new Object[]{e.getMessage(), iMantisJobMetadata.getJobId(), e});
                }
            }
            sender.tell(new JobClusterProto.InitializeJobClusterResponse(initializeJobClusterRequest.requestId, BaseResponse.ResponseCode.SUCCESS, String.format("JobCluster %s initialized successfully. But is currently disabled", initializeJobClusterRequest.jobClusterDefinition.getName()), initializeJobClusterRequest.jobClusterDefinition.getName(), initializeJobClusterRequest.requestor), getSelf());
            this.logger.info("Job expiry check frequency set to {}", Long.valueOf(completedJobPurgeFrequencySeqs));
            setExpiredJobsTimer(completedJobPurgeFrequencySeqs);
            getContext().become(this.disabledBehavior);
            return;
        }
        if (initializeJobClusterRequest.createInStore) {
            try {
                this.jobStore.createJobCluster(this.jobClusterMetadata);
                this.eventPublisher.publishAuditEvent(new LifecycleEventsProto.AuditEvent(LifecycleEventsProto.AuditEvent.AuditEventType.JOB_CLUSTER_CREATE, this.jobClusterMetadata.getJobClusterDefinition().getName(), "saved job cluster " + this.name));
                this.logger.info("successfully saved job cluster {}", this.name);
                this.numJobClustersInitialized.increment();
            } catch (JobClusterAlreadyExistsException e2) {
                this.numJobClusterInitializeFailures.increment();
                this.logger.error("job cluster not created");
                sender.tell(new JobClusterProto.InitializeJobClusterResponse(initializeJobClusterRequest.requestId, BaseResponse.ResponseCode.CLIENT_ERROR, String.format("JobCluster %s already exists", initializeJobClusterRequest.jobClusterDefinition.getName()), initializeJobClusterRequest.jobClusterDefinition.getName(), initializeJobClusterRequest.requestor), getSelf());
                return;
            } catch (Exception e3) {
                this.numJobClusterInitializeFailures.increment();
                this.logger.error("job cluster not created due to {}", e3.getMessage(), e3);
                sender.tell(new JobClusterProto.InitializeJobClusterResponse(initializeJobClusterRequest.requestId, BaseResponse.ResponseCode.SERVER_ERROR, String.format("JobCluster %s not created due to %s", initializeJobClusterRequest.jobClusterDefinition.getName(), Throwables.getStackTraceAsString(e3)), initializeJobClusterRequest.jobClusterDefinition.getName(), initializeJobClusterRequest.requestor), getSelf());
                return;
            }
        }
        try {
            this.cronManager = new CronManager(this.name, getSelf(), this.jobClusterMetadata.getJobClusterDefinition().getSLA());
        } catch (Exception e4) {
            this.logger.warn("Exception initializing cron", e4);
        }
        initRunningJobs(initializeJobClusterRequest, sender);
        setExpiredJobsTimer(completedJobPurgeFrequencySeqs);
        this.logger.info("Job expiry check frequency set to {}", Long.valueOf(completedJobPurgeFrequencySeqs));
        try {
            this.jobManager.addCompletedJobsToCache(initializeJobClusterRequest.completedJobsList);
        } catch (Exception e5) {
            this.logger.warn("Exception initializing completed jobs " + e5.getMessage(), e5);
        }
    }

    private void initRunningJobs(JobClusterProto.InitializeJobClusterRequest initializeJobClusterRequest, ActorRef actorRef) {
        List<JobClusterDefinitionImpl.CompletedJob> list = initializeJobClusterRequest.completedJobsList;
        List<IMantisJobMetadata> list2 = initializeJobClusterRequest.jobList;
        this.logger.info("In _initJobs for cluster {}: {} activeJobs and {} completedJobs", new Object[]{this.name, Integer.valueOf(list2.size()), Integer.valueOf(list.size())});
        if (this.logger.isDebugEnabled()) {
            this.logger.debug("In _initJobs for cluster {} activeJobs -> {} and completedJobs -> {}", new Object[]{this.name, list2, list});
        }
        Observable.from(list2).flatMap(iMantisJobMetadata -> {
            if (JobState.isTerminalState(iMantisJobMetadata.getState())) {
                this.jobManager.persistToCompletedJobAndArchiveJobTables(iMantisJobMetadata);
                return Observable.empty();
            }
            if (iMantisJobMetadata.getSchedulingInfo() != null) {
                return Observable.just(iMantisJobMetadata);
            }
            this.logger.error("Scheduling info is null for active job {} in cluster {}.Skipping bootstrap ", iMantisJobMetadata.getJobId(), this.name);
            return Observable.empty();
        }).flatMap(iMantisJobMetadata2 -> {
            return this.jobManager.bootstrapJob((MantisJobMetadataImpl) iMantisJobMetadata2, this.jobClusterMetadata);
        }).subscribe(jobInitialized -> {
            this.logger.info("Job Id {} initialized with code {}", jobInitialized.jobId, jobInitialized.responseCode);
        }, th -> {
            this.logger.warn("Exception initializing jobs {}", th.getMessage());
        }, () -> {
            if (initializeJobClusterRequest.jobList.size() > 0) {
                this.jobIdSubmissionSubject.onNext(new JobId(this.name, initializeJobClusterRequest.lastJobNumber));
            }
            setBookkeepingTimer(5L);
            getContext().become(this.initializedBehavior);
            this.logger.info("Job Cluster {} initialized", this.name);
            actorRef.tell(new JobClusterProto.InitializeJobClusterResponse(initializeJobClusterRequest.requestId, BaseResponse.ResponseCode.SUCCESS, String.format("JobCluster %s initialized successfully", initializeJobClusterRequest.jobClusterDefinition.getName()), initializeJobClusterRequest.jobClusterDefinition.getName(), initializeJobClusterRequest.requestor), getSelf());
        });
    }

    @Override // io.mantisrx.master.jobcluster.IJobClusterManager
    public void onJobClusterUpdate(JobClusterManagerProto.UpdateJobClusterRequest updateJobClusterRequest) {
        String name = updateJobClusterRequest.getJobClusterDefinition().getName();
        ActorRef sender = getSender();
        String version = updateJobClusterRequest.getJobClusterDefinition().getJobClusterConfig().getVersion();
        if (!isVersionUnique(version, this.jobClusterMetadata.getJobClusterDefinition().getJobClusterConfigs())) {
            String format = String.format("Job cluster %s not updated as the version %s is not unique", name, version);
            this.logger.error(format);
            sender.tell(new JobClusterManagerProto.UpdateJobClusterResponse(updateJobClusterRequest.requestId, BaseResponse.ResponseCode.CLIENT_ERROR, format), getSelf());
            return;
        }
        try {
            updateAndSaveJobCluster(new JobClusterMetadataImpl.Builder().withIsDisabled(this.jobClusterMetadata.isDisabled()).withLastJobCount(this.jobClusterMetadata.getLastJobCount()).withJobClusterDefinition(new JobClusterDefinitionImpl.Builder().mergeConfigsAndOverrideRest(this.jobClusterMetadata.getJobClusterDefinition(), updateJobClusterRequest.getJobClusterDefinition()).build()).build());
            sender.tell(new JobClusterManagerProto.UpdateJobClusterResponse(updateJobClusterRequest.requestId, BaseResponse.ResponseCode.SUCCESS, name + " Job cluster updated"), getSelf());
            this.numJobClusterUpdate.increment();
        } catch (Exception e) {
            this.logger.error("job cluster not created");
            sender.tell(new JobClusterManagerProto.UpdateJobClusterResponse(updateJobClusterRequest.requestId, BaseResponse.ResponseCode.SERVER_ERROR, name + " Job cluster updation failed " + e.getMessage()), getSelf());
            this.numJobClusterUpdateErrors.increment();
        }
    }

    @Override // io.mantisrx.master.jobcluster.IJobClusterManager
    public void onJobClusterDelete(JobClusterProto.DeleteJobClusterRequest deleteJobClusterRequest) {
        ActorRef sender = getSender();
        try {
            if (this.jobManager.isJobListEmpty()) {
                this.jobManager.cleanupAllCompletedJobs();
                this.jobStore.deleteJobCluster(this.name);
                this.logger.info("successfully deleted job cluster {}", this.name);
                this.eventPublisher.publishAuditEvent(new LifecycleEventsProto.AuditEvent(LifecycleEventsProto.AuditEvent.AuditEventType.JOB_CLUSTER_DELETE, this.name, this.name + " deleted"));
                sender.tell(new JobClusterProto.DeleteJobClusterResponse(deleteJobClusterRequest.requestId, BaseResponse.ResponseCode.SUCCESS, this.name + " deleted", deleteJobClusterRequest.requestingActor, this.name), getSelf());
                this.numJobClusterDelete.increment();
            } else {
                this.logger.warn("job cluster {} cannot be deleted as it has active jobs", this.name);
                sender.tell(new JobClusterProto.DeleteJobClusterResponse(deleteJobClusterRequest.requestId, BaseResponse.ResponseCode.CLIENT_ERROR, this.name + " Job cluster deletion failed as there are active jobs", deleteJobClusterRequest.requestingActor, this.name), getSelf());
            }
        } catch (Exception e) {
            this.logger.error("job cluster {} not deleted", this.name);
            sender.tell(new JobClusterProto.DeleteJobClusterResponse(deleteJobClusterRequest.requestId, BaseResponse.ResponseCode.SERVER_ERROR, this.name + " Job cluster deletion failed " + e.getMessage(), deleteJobClusterRequest.requestingActor, this.name), getSelf());
            this.numJobClusterDeleteErrors.increment();
        }
    }

    @Override // io.mantisrx.master.jobcluster.IJobClusterManager
    public void onJobIdList(JobClusterManagerProto.ListJobIdsRequest listJobIdsRequest) {
        if (this.logger.isTraceEnabled()) {
            this.logger.trace("Entering JCA:onJobIdList");
        }
        ActorRef sender = getSender();
        Set<JobId> hashSet = new HashSet();
        if (!listJobIdsRequest.getCriteria().getMatchingLabels().isEmpty()) {
            hashSet = this.jobManager.getJobsMatchingLabels(listJobIdsRequest.getCriteria().getMatchingLabels(), listJobIdsRequest.getCriteria().getLabelsOperand());
            if (hashSet.isEmpty()) {
                sender.tell(new JobClusterManagerProto.ListJobIdsResponse(listJobIdsRequest.requestId, BaseResponse.ResponseCode.SUCCESS, "No JobIds match given Label criterion", new ArrayList()), sender);
                if (this.logger.isTraceEnabled()) {
                    this.logger.trace("Exit JCA:onJobIdList");
                    return;
                }
                return;
            }
        }
        List<JobClusterProtoAdapter.JobIdInfo> filteredNonTerminalJobIdList = getFilteredNonTerminalJobIdList(listJobIdsRequest.filters, hashSet);
        if (!listJobIdsRequest.getCriteria().getActiveOnly().orElse(true).booleanValue()) {
            filteredNonTerminalJobIdList.addAll(getFilteredTerminalJobIdList(listJobIdsRequest.filters, hashSet));
        }
        sender.tell(new JobClusterManagerProto.ListJobIdsResponse(listJobIdsRequest.requestId, BaseResponse.ResponseCode.SUCCESS, "", filteredNonTerminalJobIdList), sender);
        if (this.logger.isTraceEnabled()) {
            this.logger.trace("Exit JCA:onJobIdList");
        }
    }

    @Override // io.mantisrx.master.jobcluster.IJobClusterManager
    public void onJobList(JobClusterManagerProto.ListJobsRequest listJobsRequest) {
        if (this.logger.isDebugEnabled()) {
            this.logger.info("Entering JCA:onJobList");
        }
        ActorRef sender = getSender();
        ActorRef self = getSelf();
        Set<JobId> hashSet = new HashSet();
        if (!listJobsRequest.getCriteria().getMatchingLabels().isEmpty()) {
            hashSet = this.jobManager.getJobsMatchingLabels(listJobsRequest.getCriteria().getMatchingLabels(), listJobsRequest.getCriteria().getLabelsOperand());
            if (hashSet.isEmpty()) {
                if (this.logger.isTraceEnabled()) {
                    this.logger.trace("Exit JCA:onJobList {}", Integer.valueOf(hashSet.size()));
                }
                sender.tell(new JobClusterManagerProto.ListJobsResponse(listJobsRequest.requestId, BaseResponse.ResponseCode.SUCCESS, "", new ArrayList()), self);
                return;
            }
        }
        getFilteredNonTerminalJobList(listJobsRequest.getCriteria(), hashSet).mergeWith(getFilteredTerminalJobList(listJobsRequest.getCriteria(), hashSet)).collect(() -> {
            return Lists.newArrayList();
        }, (v0, v1) -> {
            v0.add(v1);
        }).doOnNext(arrayList -> {
            if (this.logger.isTraceEnabled()) {
                this.logger.trace("Exit JCA:onJobList {}", Integer.valueOf(arrayList.size()));
            }
            sender.tell(new JobClusterManagerProto.ListJobsResponse(listJobsRequest.requestId, BaseResponse.ResponseCode.SUCCESS, "", arrayList), self);
        }).subscribe();
    }

    @Override // io.mantisrx.master.jobcluster.IJobClusterManager
    public void onListArchivedWorkers(JobClusterManagerProto.ListArchivedWorkersRequest listArchivedWorkersRequest) {
        if (this.logger.isTraceEnabled()) {
            this.logger.trace("In onListArchiveWorkers {}", listArchivedWorkersRequest);
        }
        try {
            List<IMantisWorkerMetadata> archivedWorkers = this.jobStore.getArchivedWorkers(listArchivedWorkersRequest.getJobId().getId());
            if (archivedWorkers.size() > listArchivedWorkersRequest.getLimit()) {
                archivedWorkers = archivedWorkers.subList(0, listArchivedWorkersRequest.getLimit());
            }
            if (this.logger.isTraceEnabled()) {
                this.logger.trace("Returning {} archived Workers", Integer.valueOf(archivedWorkers.size()));
            }
            getSender().tell(new JobClusterManagerProto.ListArchivedWorkersResponse(listArchivedWorkersRequest.requestId, BaseResponse.ResponseCode.SUCCESS, "", archivedWorkers), getSelf());
        } catch (Exception e) {
            this.logger.error("Exception listing archived workers", e);
            getSender().tell(new JobClusterManagerProto.ListArchivedWorkersResponse(listArchivedWorkersRequest.requestId, BaseResponse.ResponseCode.SERVER_ERROR, "Exception getting archived workers for job " + listArchivedWorkersRequest.getJobId() + " -> " + e.getMessage(), Lists.newArrayList()), getSelf());
        }
    }

    @Override // io.mantisrx.master.jobcluster.IJobClusterManager
    public void onListActiveWorkers(JobClusterManagerProto.ListWorkersRequest listWorkersRequest) {
        if (this.logger.isTraceEnabled()) {
            this.logger.trace("Enter JobClusterActor:onListActiveWorkers {}", listWorkersRequest);
        }
        Optional<JobInfo> jobInfoForNonTerminalJob = this.jobManager.getJobInfoForNonTerminalJob(listWorkersRequest.getJobId());
        if (jobInfoForNonTerminalJob.isPresent()) {
            jobInfoForNonTerminalJob.get().jobActor.forward(listWorkersRequest, getContext());
        } else {
            this.logger.warn("No such active job {} ", listWorkersRequest.getJobId());
            getSender().tell(new JobClusterManagerProto.ListWorkersResponse(listWorkersRequest.requestId, BaseResponse.ResponseCode.CLIENT_ERROR, "No such active job " + listWorkersRequest.getJobId(), Lists.newArrayList()), getSelf());
        }
        if (this.logger.isTraceEnabled()) {
            this.logger.trace("Exit JobClusterActor:onListActiveWorkers {}", listWorkersRequest);
        }
    }

    private List<JobClusterProtoAdapter.JobIdInfo> getFilteredNonTerminalJobIdList(JobClusterManagerProto.ListJobCriteria listJobCriteria, Set<JobId> set) {
        if (this.logger.isTraceEnabled()) {
            this.logger.trace("Enter JobClusterActor:getFilteredNonTerminalJobIdList {}", listJobCriteria);
        }
        if (listJobCriteria.getJobState().isPresent() && listJobCriteria.getJobState().get().equals(JobState.MetaState.Terminal)) {
            if (this.logger.isTraceEnabled()) {
                this.logger.trace("Exit JobClusterActor:getFilteredNonTerminalJobIdList with empty");
            }
            return Collections.emptyList();
        }
        List<JobInfo> allNonTerminalJobsList = !set.isEmpty() ? (List) set.stream().map(jobId -> {
            return this.jobManager.getJobInfoForNonTerminalJob(jobId);
        }).filter(optional -> {
            return optional.isPresent();
        }).map(optional2 -> {
            return (JobInfo) optional2.get();
        }).collect(Collectors.toList()) : this.jobManager.getAllNonTerminalJobsList();
        List<JobClusterProtoAdapter.JobIdInfo> list = (List) allNonTerminalJobsList.subList(0, Math.min(allNonTerminalJobsList.size(), listJobCriteria.getLimit().orElse(DEFAULT_LIMIT).intValue())).stream().map(jobInfo -> {
            return new JobClusterProtoAdapter.JobIdInfo.Builder().withJobId(jobInfo.jobId).withJobState(jobInfo.state).withSubmittedAt(jobInfo.submittedAt).withTerminatedAt(jobInfo.terminatedAt).withUser(jobInfo.user).withVersion(jobInfo.jobDefinition.getVersion()).build();
        }).collect(Collectors.toList());
        if (this.logger.isTraceEnabled()) {
            this.logger.trace("Exit JobClusterActor:getFilteredNonTerminalJobIdList {}", Integer.valueOf(list.size()));
        }
        return list;
    }

    private List<JobClusterProtoAdapter.JobIdInfo> getFilteredTerminalJobIdList(JobClusterManagerProto.ListJobCriteria listJobCriteria, Set<JobId> set) {
        if (this.logger.isTraceEnabled()) {
            this.logger.trace("Enter JobClusterActor:getFilteredTerminalJobIdList {}", listJobCriteria);
        }
        if (listJobCriteria.getJobState().isPresent() && !listJobCriteria.getJobState().get().equals(JobState.MetaState.Terminal)) {
            if (this.logger.isTraceEnabled()) {
                this.logger.trace("Exit JobClusterActor:getFilteredTerminalJobIdList with empty");
            }
            return Collections.emptyList();
        }
        if (!listJobCriteria.getJobState().isPresent() && listJobCriteria.getActiveOnly().isPresent() && listJobCriteria.getActiveOnly().get().booleanValue()) {
            if (this.logger.isTraceEnabled()) {
                this.logger.trace("Exit JobClusterActor:getFilteredTerminalJobIdList with empty");
            }
            return Collections.emptyList();
        }
        List<JobClusterDefinitionImpl.CompletedJob> completedJobsList = !set.isEmpty() ? (List) set.stream().map(jobId -> {
            return this.jobManager.getCompletedJob(jobId);
        }).filter(optional -> {
            return optional.isPresent();
        }).map(optional2 -> {
            return (JobClusterDefinitionImpl.CompletedJob) optional2.get();
        }).collect(Collectors.toList()) : this.jobManager.getCompletedJobsList();
        List<JobClusterProtoAdapter.JobIdInfo> list = (List) completedJobsList.subList(0, Math.min(completedJobsList.size(), listJobCriteria.getLimit().orElse(DEFAULT_LIMIT).intValue())).stream().map(completedJob -> {
            return new JobClusterProtoAdapter.JobIdInfo.Builder().withJobIdStr(completedJob.getJobId()).withVersion(completedJob.getVersion()).withUser(completedJob.getUser()).withSubmittedAt(completedJob.getSubmittedAt()).withTerminatedAt(completedJob.getTerminatedAt()).withJobState(completedJob.getState()).build();
        }).filter((v0) -> {
            return Objects.nonNull(v0);
        }).collect(Collectors.toList());
        if (this.logger.isTraceEnabled()) {
            this.logger.trace("Exit JobClusterActor:getFilteredTerminalJobIdList {}", Integer.valueOf(list.size()));
        }
        return list;
    }

    private Observable<MantisJobMetadataView> getFilteredNonTerminalJobList(JobClusterManagerProto.ListJobCriteria listJobCriteria, Set<JobId> set) {
        if (this.logger.isTraceEnabled()) {
            this.logger.trace("Entering JobClusterActor:getFilteredNonTerminalJobList");
        }
        Duration ofMillis = Duration.ofMillis(500L);
        if (listJobCriteria.getJobState().isPresent() && listJobCriteria.getJobState().get().equals(JobState.MetaState.Terminal)) {
            if (this.logger.isTraceEnabled()) {
                this.logger.trace("Exit JobClusterActor:getFilteredNonTerminalJobList with empty");
            }
            return Observable.empty();
        }
        List<JobInfo> allNonTerminalJobsList = !set.isEmpty() ? (List) set.stream().map(jobId -> {
            return this.jobManager.getJobInfoForNonTerminalJob(jobId);
        }).filter(optional -> {
            return optional.isPresent();
        }).map(optional2 -> {
            return (JobInfo) optional2.get();
        }).collect(Collectors.toList()) : this.jobManager.getAllNonTerminalJobsList();
        List<JobInfo> subList = allNonTerminalJobsList.subList(0, Math.min(allNonTerminalJobsList.size(), listJobCriteria.getLimit().orElse(DEFAULT_ACTIVE_JOB_LIMIT).intValue()));
        if (this.logger.isDebugEnabled()) {
            this.logger.debug("List of non terminal jobs {}", allNonTerminalJobsList);
        }
        return Observable.from(subList).flatMap(jobInfo -> {
            CompletionStage ask = PatternsCS.ask(jobInfo.jobActor, new JobClusterManagerProto.GetJobDetailsRequest("system", jobInfo.jobId), ofMillis);
            Class<JobClusterManagerProto.GetJobDetailsResponse> cls = JobClusterManagerProto.GetJobDetailsResponse.class;
            JobClusterManagerProto.GetJobDetailsResponse.class.getClass();
            return Observable.from(ask.thenApply(cls::cast).toCompletableFuture(), Schedulers.io()).onErrorResumeNext(th -> {
                this.logger.warn("caught exception {}", th.getMessage(), th);
                return Observable.empty();
            });
        }).filter(getJobDetailsResponse -> {
            return Boolean.valueOf(getJobDetailsResponse != null && getJobDetailsResponse.getJobMetadata().isPresent());
        }).map(getJobDetailsResponse2 -> {
            return getJobDetailsResponse2.getJobMetadata().get();
        }).map(iMantisJobMetadata -> {
            return new MantisJobMetadataView(iMantisJobMetadata, listJobCriteria.getStageNumberList(), listJobCriteria.getWorkerIndexList(), listJobCriteria.getWorkerNumberList(), listJobCriteria.getWorkerStateList(), false);
        });
    }

    private Observable<MantisJobMetadataView> getFilteredTerminalJobList(JobClusterManagerProto.ListJobCriteria listJobCriteria, Set<JobId> set) {
        if (this.logger.isTraceEnabled()) {
            this.logger.trace("JobClusterActor:getFilteredTerminalJobList");
        }
        if (listJobCriteria.getJobState().isPresent() && !listJobCriteria.getJobState().get().equals(JobState.MetaState.Terminal)) {
            if (this.logger.isTraceEnabled()) {
                this.logger.trace("Exit JobClusterActor:getFilteredTerminalJobList with empty");
            }
            return Observable.empty();
        }
        if (listJobCriteria.getJobState().isPresent() || !listJobCriteria.getActiveOnly().isPresent() || !listJobCriteria.getActiveOnly().get().booleanValue()) {
            List<JobClusterDefinitionImpl.CompletedJob> completedJobsList = !set.isEmpty() ? (List) set.stream().map(jobId -> {
                return this.jobManager.getCompletedJob(jobId);
            }).filter(optional -> {
                return optional.isPresent();
            }).map(optional2 -> {
                return (JobClusterDefinitionImpl.CompletedJob) optional2.get();
            }).collect(Collectors.toList()) : this.jobManager.getCompletedJobsList();
            return Observable.from(completedJobsList.subList(0, Math.min(completedJobsList.size(), listJobCriteria.getLimit().orElse(DEFAULT_LIMIT).intValue()))).flatMap(completedJob -> {
                try {
                    if (this.logger.isDebugEnabled()) {
                        this.logger.debug("Fetching details for completed job {}", completedJob);
                    }
                    Optional<IMantisJobMetadata> jobDataForCompletedJob = this.jobManager.getJobDataForCompletedJob(completedJob.getJobId());
                    if (!jobDataForCompletedJob.isPresent()) {
                        return Observable.empty();
                    }
                    if (this.logger.isDebugEnabled()) {
                        this.logger.debug("Fetched details for completed job {} -> {}", completedJob, jobDataForCompletedJob.get());
                    }
                    return Observable.just(new MantisJobMetadataView(jobDataForCompletedJob.get(), completedJob.getTerminatedAt(), listJobCriteria.getStageNumberList(), listJobCriteria.getWorkerIndexList(), listJobCriteria.getWorkerNumberList(), listJobCriteria.getWorkerStateList(), false));
                } catch (Exception e) {
                    this.logger.error("caught exception", e);
                    return Observable.empty();
                }
            });
        }
        if (this.logger.isTraceEnabled()) {
            this.logger.trace("Exit JobClusterActor:getFilteredTerminalJobList with empty");
        }
        return Observable.empty();
    }

    @Override // io.mantisrx.master.jobcluster.IJobClusterManager
    public void onJobListCompleted(JobClusterManagerProto.ListCompletedJobsInClusterRequest listCompletedJobsInClusterRequest) {
        if (this.logger.isTraceEnabled()) {
            this.logger.trace("Enter onJobListCompleted {}", listCompletedJobsInClusterRequest);
        }
        ActorRef sender = getSender();
        List<JobClusterDefinitionImpl.CompletedJob> completedJobsList = this.jobManager.getCompletedJobsList();
        if (listCompletedJobsInClusterRequest.getLimit() > completedJobsList.size()) {
            completedJobsList = completedJobsList.subList(0, listCompletedJobsInClusterRequest.getLimit());
        }
        sender.tell(new JobClusterManagerProto.ListCompletedJobsInClusterResponse(listCompletedJobsInClusterRequest.requestId, BaseResponse.ResponseCode.SUCCESS, "", completedJobsList), sender);
        if (this.logger.isTraceEnabled()) {
            this.logger.trace("Exit onJobListCompleted {}", Integer.valueOf(completedJobsList.size()));
        }
    }

    @Override // io.mantisrx.master.jobcluster.IJobClusterManager
    public void onJobClusterDisable(JobClusterManagerProto.DisableJobClusterRequest disableJobClusterRequest) {
        if (this.logger.isTraceEnabled()) {
            this.logger.trace("Enter onJobClusterDisable {}", disableJobClusterRequest);
        }
        ActorRef sender = getSender();
        try {
            IJobClusterMetadata build = new JobClusterMetadataImpl.Builder().withIsDisabled(true).withLastJobCount(this.jobClusterMetadata.getLastJobCount()).withJobClusterDefinition((JobClusterDefinitionImpl) this.jobClusterMetadata.getJobClusterDefinition()).build();
            this.jobStore.updateJobCluster(build);
            this.jobClusterMetadata = build;
            this.cronManager.destroyCron();
            getContext().become(this.disabledBehavior);
            ArrayList<JobInfo> arrayList = new ArrayList();
            arrayList.addAll(this.jobManager.getAcceptedJobsList());
            arrayList.addAll(this.jobManager.getActiveJobsList());
            for (JobInfo jobInfo : arrayList) {
                jobInfo.jobActor.tell(new JobClusterProto.KillJobRequest(jobInfo.jobId, "Job cluster disabled", JobCompletedReason.Killed, disableJobClusterRequest.getUser(), ActorRef.noSender()), getSelf());
            }
            getTimers().cancel(BOOKKEEPING_TIMER_KEY);
            this.eventPublisher.publishAuditEvent(new LifecycleEventsProto.AuditEvent(LifecycleEventsProto.AuditEvent.AuditEventType.JOB_CLUSTER_DISABLED, build.getJobClusterDefinition().getName(), this.name + " disabled"));
            sender.tell(new JobClusterManagerProto.DisableJobClusterResponse(disableJobClusterRequest.requestId, BaseResponse.ResponseCode.SUCCESS, String.format("%s disabled", this.name)), getSelf());
            this.numJobClusterDisable.increment();
            this.logger.info("Job Cluster {} is disabbled", this.name);
        } catch (Exception e) {
            String str = "Exception disabling cluster " + this.name + " due to " + e.getMessage();
            this.logger.error(str, e);
            sender.tell(new JobClusterManagerProto.DisableJobClusterResponse(disableJobClusterRequest.requestId, BaseResponse.ResponseCode.SERVER_ERROR, str), getSelf());
            this.numJobClusterDisableErrors.increment();
        }
        if (this.logger.isTraceEnabled()) {
            this.logger.trace("Exit onJobClusterDisable");
        }
    }

    @Override // io.mantisrx.master.jobcluster.IJobClusterManager
    public void onJobClusterEnable(JobClusterManagerProto.EnableJobClusterRequest enableJobClusterRequest) {
        if (this.logger.isTraceEnabled()) {
            this.logger.trace("Enter onJobClusterEnable");
        }
        ActorRef sender = getSender();
        try {
            IJobClusterMetadata build = new JobClusterMetadataImpl.Builder().withIsDisabled(false).withLastJobCount(this.jobClusterMetadata.getLastJobCount()).withJobClusterDefinition((JobClusterDefinitionImpl) this.jobClusterMetadata.getJobClusterDefinition()).build();
            this.jobStore.updateJobCluster(build);
            this.jobClusterMetadata = build;
            if (this.cronManager == null) {
                this.cronManager = new CronManager(this.name, getSelf(), build.getJobClusterDefinition().getSLA());
            }
            this.cronManager.initCron();
            getContext().become(this.initializedBehavior);
            setBookkeepingTimer(5L);
            this.eventPublisher.publishAuditEvent(new LifecycleEventsProto.AuditEvent(LifecycleEventsProto.AuditEvent.AuditEventType.JOB_CLUSTER_ENABLED, this.jobClusterMetadata.getJobClusterDefinition().getName(), this.name + " enabled"));
            sender.tell(new JobClusterManagerProto.EnableJobClusterResponse(enableJobClusterRequest.requestId, BaseResponse.ResponseCode.SUCCESS, String.format("%s enabled", this.name)), getSelf());
            this.numJobClusterEnable.increment();
            this.logger.info("Job Cluster {} is Enabled", this.name);
        } catch (Exception e) {
            String format = String.format("Exception enabling cluster %s due to %s", this.name, e.getMessage());
            this.logger.error(format, e);
            sender.tell(new JobClusterManagerProto.EnableJobClusterResponse(enableJobClusterRequest.requestId, BaseResponse.ResponseCode.SERVER_ERROR, format), getSelf());
            this.numJobClusterEnableErrors.increment();
        }
        if (this.logger.isTraceEnabled()) {
            this.logger.trace("Enter onJobClusterEnable");
        }
    }

    @Override // io.mantisrx.master.jobcluster.IJobClusterManager
    public void onJobClusterGet(JobClusterManagerProto.GetJobClusterRequest getJobClusterRequest) {
        ActorRef sender = getSender();
        if (this.logger.isTraceEnabled()) {
            this.logger.trace("In JobCluster Get " + this.jobClusterMetadata);
        }
        if (this.name.equals(getJobClusterRequest.getJobClusterName())) {
            sender.tell(new JobClusterManagerProto.GetJobClusterResponse(getJobClusterRequest.requestId, BaseResponse.ResponseCode.SUCCESS, "", Optional.of(generateJobClusterMetadataView(this.jobClusterMetadata, this.jobClusterMetadata.isDisabled(), ((Boolean) Optional.ofNullable(this.cronManager).map(cronManager -> {
                return Boolean.valueOf(cronManager.isCronActive);
            }).orElse(false)).booleanValue()))), getSelf());
        } else {
            sender.tell(new JobClusterManagerProto.GetJobClusterResponse(getJobClusterRequest.requestId, BaseResponse.ResponseCode.CLIENT_ERROR, "Cluster Name " + getJobClusterRequest.getJobClusterName() + " in request Does not match cluster Name " + this.name + " of Job Cluster Actor", Optional.empty()), getSelf());
        }
        if (this.logger.isTraceEnabled()) {
            this.logger.trace("Exit onJobClusterGet");
        }
    }

    private MantisJobClusterMetadataView generateJobClusterMetadataView(IJobClusterMetadata iJobClusterMetadata, boolean z, boolean z2) {
        return new MantisJobClusterMetadataView.Builder().withName(iJobClusterMetadata.getJobClusterDefinition().getName()).withDisabled(z).withIsReadyForJobMaster(iJobClusterMetadata.getJobClusterDefinition().getIsReadyForJobMaster()).withJars(iJobClusterMetadata.getJobClusterDefinition().getJobClusterConfigs()).withJobOwner(iJobClusterMetadata.getJobClusterDefinition().getOwner()).withLabels(iJobClusterMetadata.getJobClusterDefinition().getLabels()).withLastJobCount(iJobClusterMetadata.getLastJobCount()).withSla(iJobClusterMetadata.getJobClusterDefinition().getSLA()).withMigrationConfig(iJobClusterMetadata.getJobClusterDefinition().getWorkerMigrationConfig()).withParameters(iJobClusterMetadata.getJobClusterDefinition().getParameters()).isCronActive(z2).withLatestVersion(iJobClusterMetadata.getJobClusterDefinition().getJobClusterConfig().getVersion()).build();
    }

    @Override // io.mantisrx.master.jobcluster.IJobClusterManager
    public void onJobSubmit(JobClusterManagerProto.SubmitJobRequest submitJobRequest) {
        String userProvidedType;
        ActorRef sender = getSender();
        if (submitJobRequest.getJobDefinition().isPresent() && (userProvidedType = submitJobRequest.getJobDefinition().get().getJobSla().getUserProvidedType()) != null && !userProvidedType.isEmpty()) {
            Optional<JobInfo> jobInfoByUniqueId = this.jobManager.getJobInfoByUniqueId(userProvidedType);
            if (jobInfoByUniqueId.isPresent()) {
                this.logger.info("Job with unique {} already exists, returning its job Id {}", userProvidedType, jobInfoByUniqueId.get().jobId);
                sender.tell(new JobClusterManagerProto.SubmitJobResponse(submitJobRequest.requestId, BaseResponse.ResponseCode.SUCCESS, jobInfoByUniqueId.get().jobId.getId(), Optional.of(jobInfoByUniqueId.get().jobId)), getSelf());
                return;
            }
        }
        this.logger.info("Submitting job {}", submitJobRequest);
        try {
            if (requireJobActorProcess(submitJobRequest)) {
                this.logger.info("Sending job submit request to job actor for inheritance: {}", Long.valueOf(submitJobRequest.requestId));
                return;
            }
            JobDefinition fromJobClusterDefinition = submitJobRequest.isSubmitLatest() ? fromJobClusterDefinition(submitJobRequest.getSubmitter(), this.jobClusterMetadata.getJobClusterDefinition()) : getResolvedJobDefinition(submitJobRequest.getSubmitter(), submitJobRequest.getJobDefinition());
            this.eventPublisher.publishStatusEvent(new LifecycleEventsProto.JobClusterStatusEvent(LifecycleEventsProto.StatusEvent.StatusEventType.INFO, "Job submit request received", this.jobClusterMetadata.getJobClusterDefinition().getName()));
            submitJob(LabelManager.insertSystemLabels(fromJobClusterDefinition, submitJobRequest.isAutoResubmit()), sender, submitJobRequest.getSubmitter());
            this.numJobSubmissions.increment();
        } catch (PersistException e) {
            this.logger.error("Exception submitting job {} from {}", new Object[]{submitJobRequest.getClusterName(), submitJobRequest.getSubmitter(), e});
            this.numJobSubmissionFailures.increment();
            sender.tell(new JobClusterManagerProto.SubmitJobResponse(submitJobRequest.requestId, BaseResponse.ResponseCode.SERVER_ERROR, e.getMessage(), Optional.empty()), getSelf());
        } catch (Exception e2) {
            this.logger.error("Exception submitting job {} from {}", new Object[]{submitJobRequest.getClusterName(), submitJobRequest.getSubmitter(), e2});
            this.numJobSubmissionFailures.increment();
            sender.tell(new JobClusterManagerProto.SubmitJobResponse(submitJobRequest.requestId, BaseResponse.ResponseCode.CLIENT_ERROR, e2.getMessage(), Optional.empty()), getSelf());
        }
    }

    public void onGetJobDefinitionUpdatedFromJobActorResponse(JobClusterManagerProto.GetJobDefinitionUpdatedFromJobActorResponse getJobDefinitionUpdatedFromJobActorResponse) {
        this.logger.info("Resuming job submission from job actor");
        ActorRef originalSender = getJobDefinitionUpdatedFromJobActorResponse.getOriginalSender();
        if (getJobDefinitionUpdatedFromJobActorResponse.responseCode == BaseResponse.ResponseCode.SERVER_ERROR || getJobDefinitionUpdatedFromJobActorResponse.getJobDefinition() == null) {
            this.logger.error("Failed to retrieve job definition from job actor");
            this.numJobSubmissionFailures.increment();
            originalSender.tell(new JobClusterManagerProto.SubmitJobResponse(getJobDefinitionUpdatedFromJobActorResponse.requestId, BaseResponse.ResponseCode.SERVER_ERROR, getJobDefinitionUpdatedFromJobActorResponse.message, Optional.empty()), getSelf());
            return;
        }
        try {
            JobDefinition jobDefinition = getJobDefinitionUpdatedFromJobActorResponse.getJobDefinition();
            if (getJobDefinitionUpdatedFromJobActorResponse.isQuickSubmitMode()) {
                Optional<JobDefinition> cloneToNewJobDefinitionWithoutArtifactNameAndVersion = cloneToNewJobDefinitionWithoutArtifactNameAndVersion(getJobDefinitionUpdatedFromJobActorResponse.getJobDefinition());
                if (cloneToNewJobDefinitionWithoutArtifactNameAndVersion.isPresent()) {
                    jobDefinition = cloneToNewJobDefinitionWithoutArtifactNameAndVersion.get();
                }
            }
            JobDefinition resolvedJobDefinition = this.jobDefinitionResolver.getResolvedJobDefinition(getJobDefinitionUpdatedFromJobActorResponse.getUser(), jobDefinition, this.jobClusterMetadata);
            this.eventPublisher.publishStatusEvent(new LifecycleEventsProto.JobClusterStatusEvent(LifecycleEventsProto.StatusEvent.StatusEventType.INFO, "Job submit request received", this.jobClusterMetadata.getJobClusterDefinition().getName()));
            submitJob(LabelManager.insertSystemLabels(resolvedJobDefinition, getJobDefinitionUpdatedFromJobActorResponse.isAutoResubmit()), originalSender, getJobDefinitionUpdatedFromJobActorResponse.getUser());
            this.numJobSubmissions.increment();
        } catch (PersistException e) {
            this.logger.error("Exception submitting job {} from {}", new Object[]{this.name, getJobDefinitionUpdatedFromJobActorResponse.getUser(), e});
            this.numJobSubmissionFailures.increment();
            originalSender.tell(new JobClusterManagerProto.SubmitJobResponse(getJobDefinitionUpdatedFromJobActorResponse.requestId, BaseResponse.ResponseCode.SERVER_ERROR, e.getMessage(), Optional.empty()), getSelf());
        } catch (Exception e2) {
            this.logger.error("Exception submitting job {} from {}", new Object[]{this.name, getJobDefinitionUpdatedFromJobActorResponse.getUser(), e2});
            this.numJobSubmissionFailures.increment();
            originalSender.tell(new JobClusterManagerProto.SubmitJobResponse(getJobDefinitionUpdatedFromJobActorResponse.requestId, BaseResponse.ResponseCode.CLIENT_ERROR, e2.getMessage(), Optional.empty()), getSelf());
        }
    }

    private boolean requireJobActorProcess(JobClusterManagerProto.SubmitJobRequest submitJobRequest) {
        String submitter = submitJobRequest.getSubmitter();
        Optional<JobDefinition> jobDefinition = submitJobRequest.getJobDefinition();
        Optional<JobId> lastSubmittedJobId = JobListHelper.getLastSubmittedJobId(this.jobManager.getAllNonTerminalJobsList(), Collections.emptyList());
        if (!lastSubmittedJobId.isPresent()) {
            this.logger.info("No valid last job id found for inheritance. Skip job actor process step.");
            return false;
        }
        Optional<JobInfo> jobInfoForNonTerminalJob = this.jobManager.getJobInfoForNonTerminalJob(lastSubmittedJobId.get());
        if (!jobInfoForNonTerminalJob.isPresent()) {
            this.logger.info("Last job id doesn't map to job info instance, skip job actor process. {}", lastSubmittedJobId.get());
            return false;
        }
        if (submitJobRequest.isSubmitLatest()) {
            this.logger.info("Submit latest job request, skip job actor process. {}", submitJobRequest);
            return false;
        }
        if (!jobDefinition.isPresent()) {
            this.logger.info("[QuickSubmit] pass to job actor to process job definition: {}", lastSubmittedJobId.get());
            jobInfoForNonTerminalJob.get().jobActor.tell(new JobClusterManagerProto.GetJobDefinitionUpdatedFromJobActorRequest(submitter, lastSubmittedJobId.get(), jobInfoForNonTerminalJob.get().jobDefinition, true, submitJobRequest.isAutoResubmit(), getSender()), getSelf());
            return true;
        }
        if (!jobDefinition.get().requireInheritInstanceCheck()) {
            this.logger.info("request doesn't require job actor process, skip job actor and continue.");
            return false;
        }
        this.logger.info("[Inherit request] pass to job actor to process job definition: {}", lastSubmittedJobId.get());
        jobInfoForNonTerminalJob.get().jobActor.tell(new JobClusterManagerProto.GetJobDefinitionUpdatedFromJobActorRequest(submitter, lastSubmittedJobId.get(), jobDefinition.get(), false, submitJobRequest.isAutoResubmit(), getSender()), getSelf());
        return true;
    }

    private JobDefinition getResolvedJobDefinition(String str, Optional<JobDefinition> optional) throws Exception {
        JobDefinition fromJobClusterDefinition;
        if (optional.isPresent()) {
            if (optional.get().getSchedulingInfo() != null && optional.get().requireInheritInstanceCheck()) {
                this.logger.warn("Job requires inheriting instance count but has no active non-terminal job.");
            }
            fromJobClusterDefinition = optional.get();
        } else {
            Optional<JobDefinition> cloneJobDefinitionForQuickSubmitFromArchivedJobs = cloneJobDefinitionForQuickSubmitFromArchivedJobs(this.jobManager.getCompletedJobsList(), Optional.empty(), this.jobStore);
            if (cloneJobDefinitionForQuickSubmitFromArchivedJobs.isPresent()) {
                this.logger.info("Inherited scheduling Info and parameters from previous job");
                fromJobClusterDefinition = cloneJobDefinitionForQuickSubmitFromArchivedJobs.get();
            } else {
                if (this.jobClusterMetadata == null || this.jobClusterMetadata.getJobClusterDefinition() == null || this.jobClusterMetadata.getJobClusterDefinition().getJobClusterConfig() == null) {
                    throw new Exception("Job Definition could not retrieved from a previous submission (There may not be a previous submission)");
                }
                this.logger.info("No previous job definition found. Fall back to cluster definition: {}", this.name);
                IJobClusterDefinition jobClusterDefinition = this.jobClusterMetadata.getJobClusterDefinition();
                this.jobClusterMetadata.getJobClusterDefinition().getJobClusterConfig();
                fromJobClusterDefinition = fromJobClusterDefinition(str, jobClusterDefinition);
                this.logger.info("Built job definition from cluster definition: {}", fromJobClusterDefinition);
            }
        }
        this.logger.info("Resolved JobDefn {}", fromJobClusterDefinition);
        return this.jobDefinitionResolver.getResolvedJobDefinition(str, fromJobClusterDefinition, this.jobClusterMetadata);
    }

    private JobDefinition fromJobClusterDefinition(String str, IJobClusterDefinition iJobClusterDefinition) throws InvalidJobException {
        JobClusterConfig jobClusterConfig = iJobClusterDefinition.getJobClusterConfig();
        return new JobDefinition.Builder().withJobSla(new JobSla.Builder().build()).withArtifactName(jobClusterConfig.getArtifactName()).withVersion(jobClusterConfig.getVersion()).withLabels(iJobClusterDefinition.getLabels()).withName(this.name).withParameters(iJobClusterDefinition.getParameters()).withSchedulingInfo(jobClusterConfig.getSchedulingInfo()).withUser(str).build();
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v20, types: [io.mantisrx.server.master.domain.JobId, long, java.lang.Object] */
    private void submitJob(JobDefinition jobDefinition, ActorRef actorRef, String str) throws PersistException {
        if (this.logger.isTraceEnabled()) {
            this.logger.trace("Enter submitJobb");
        }
        try {
            validateJobDefinition(jobDefinition);
            ?? jobId = new JobId(this.name, this.jobClusterMetadata.getLastJobCount() + 1);
            int intSystemParameter = jobDefinition.getIntSystemParameter("mantis.job.worker.heartbeat.interval.secs", 0);
            int intSystemParameter2 = jobDefinition.getIntSystemParameter("mantis.job.worker.timeout.secs", 0);
            this.logger.info("Creating new job id: {} with job defn {}, with heartbeat {} and workertimeout {}", new Object[]{jobId, jobDefinition, Integer.valueOf(intSystemParameter), Integer.valueOf(intSystemParameter2)});
            MantisJobMetadataImpl build = new MantisJobMetadataImpl.Builder().withJobId(jobId).withSubmittedAt(Instant.now()).withJobState(JobState.Accepted).withNextWorkerNumToUse(1).withJobDefinition(jobDefinition).withHeartbeatIntervalSecs(intSystemParameter).withWorkerTimeoutSecs(intSystemParameter2).build();
            this.eventPublisher.publishAuditEvent(new LifecycleEventsProto.AuditEvent(LifecycleEventsProto.AuditEvent.AuditEventType.JOB_SUBMIT, jobId.getId(), ((Object) jobId) + " submitter: " + str));
            this.jobManager.initJob(build, this.jobClusterMetadata, actorRef);
            this.numJobActorCreationCounter.increment();
            this.jobClusterMetadata = new JobClusterMetadataImpl.Builder().withJobClusterDefinition((JobClusterDefinitionImpl) this.jobClusterMetadata.getJobClusterDefinition()).withLastJobCount(jobId).withIsDisabled(this.jobClusterMetadata.isDisabled()).build();
            try {
                this.jobStore.updateJobCluster(this.jobClusterMetadata);
                this.jobIdSubmissionSubject.onNext((Object) jobId);
                this.numJobSubmissions.increment();
                if (this.logger.isTraceEnabled()) {
                    this.logger.trace("Exit submitJob");
                }
            } catch (Exception e) {
                this.logger.error("Failed to persist job cluster {} error {}", new Object[]{this.jobClusterMetadata, e.getMessage(), e});
                this.numJobSubmissionFailures.increment();
                cleanUpOnJobSubmitFailure(jobId);
                throw new PersistException(e);
            }
        } catch (PersistException e2) {
            throw e2;
        } catch (InvalidJobRequest e3) {
            this.logger.error("Invalid jobcluster : {} error {}", new Object[]{this.jobClusterMetadata, e3.getMessage(), e3});
            this.numJobSubmissionFailures.increment();
            throw new IllegalArgumentException(e3);
        } catch (Exception e4) {
            this.logger.error("Exception persisting job in store", e4);
            this.numJobSubmissionFailures.increment();
            cleanUpOnJobSubmitFailure(null);
            throw new IllegalStateException(e4);
        }
    }

    @Override // io.mantisrx.master.jobcluster.IJobClusterManager
    public void onJobInitialized(JobProto.JobInitialized jobInitialized) {
        if (this.logger.isTraceEnabled()) {
            this.logger.trace("Enter onJobInitialized");
        }
        this.jobManager.markJobInitialized(jobInitialized.jobId, System.currentTimeMillis());
        if (jobInitialized.responseCode == BaseResponse.ResponseCode.SUCCESS) {
            jobInitialized.requestor.tell(new JobClusterManagerProto.SubmitJobResponse(jobInitialized.requestId, BaseResponse.ResponseCode.SUCCESS, jobInitialized.jobId.getId(), Optional.of(jobInitialized.jobId)), getSelf());
            this.numJobsInitialized.increment();
        } else {
            this.logger.warn("Job was not initialized {}", jobInitialized);
            Optional<JobInfo> jobInfoForNonTerminalJob = this.jobManager.getJobInfoForNonTerminalJob(jobInitialized.jobId);
            if (jobInfoForNonTerminalJob.isPresent()) {
                cleanUpOnJobSubmitFailure(jobInfoForNonTerminalJob.get().jobId);
                if (jobInitialized.requestor != null) {
                    jobInitialized.requestor.tell(new JobClusterManagerProto.SubmitJobResponse(jobInitialized.requestId, jobInitialized.responseCode, "Job " + jobInitialized.jobId + " submission failed", Optional.ofNullable(jobInitialized.jobId)), getSelf());
                }
            } else {
                this.logger.warn("No such job found {}", jobInitialized.jobId);
            }
        }
        if (this.logger.isTraceEnabled()) {
            this.logger.trace("Exit onJobInitialized");
        }
    }

    @Override // io.mantisrx.master.jobcluster.IJobClusterManager
    public void onJobStarted(JobClusterProto.JobStartedEvent jobStartedEvent) {
        this.logger.info("job {} started event", jobStartedEvent.jobid);
        Optional<JobInfo> jobInfoForNonTerminalJob = this.jobManager.getJobInfoForNonTerminalJob(jobStartedEvent.jobid);
        if (jobInfoForNonTerminalJob.isPresent()) {
            this.jobManager.markJobStarted(jobInfoForNonTerminalJob.get());
            getSelf().tell(new JobClusterProto.EnforceSLARequest(Instant.now(), Optional.of(jobInfoForNonTerminalJob.get().jobDefinition)), getSelf());
        }
    }

    private void cleanUpOnJobSubmitFailure(JobId jobId) {
        if (this.logger.isTraceEnabled()) {
            this.logger.trace("Enter cleanUpOnJobSubmitFailure {}", jobId);
        }
        if (jobId != null) {
            Optional<JobInfo> jobInfoForNonTerminalJob = this.jobManager.getJobInfoForNonTerminalJob(jobId);
            if (jobInfoForNonTerminalJob.isPresent()) {
                JobInfo jobInfo = jobInfoForNonTerminalJob.get();
                if (this.jobManager.markJobTerminating(jobInfo, JobState.Failed)) {
                    getContext().unwatch(jobInfo.jobActor);
                    getContext().stop(jobInfo.jobActor);
                    this.jobManager.markCompleted(jobId, Optional.empty(), JobState.Failed);
                    this.jobManager.markJobInitialized(jobId, System.currentTimeMillis());
                } else {
                    this.logger.warn("cleanup on Job Submit failure failed for job {}", jobId);
                }
            }
        } else {
            this.logger.warn("cleanup on Job Submit failure failed as there was no JobId");
        }
        if (this.logger.isTraceEnabled()) {
            this.logger.trace("Exit cleanUpOnJobSubmitFailure {}", jobId);
        }
    }

    private void validateJobDefinition(JobDefinition jobDefinition) throws InvalidJobRequest {
        if (jobDefinition == null) {
            throw new InvalidJobRequest((JobRequest) null, "MantisJobDefinition cannot be null");
        }
        if (jobDefinition.getArtifactName() == null) {
            throw new InvalidJobRequest((JobRequest) null, "MantisJobDefinition job artifactName attribute cannot be null");
        }
        if (jobDefinition.getName() == null) {
            throw new InvalidJobRequest((JobRequest) null, "MantisJobDefinition name attribute cannot be null");
        }
        if (jobDefinition.getSchedulingInfo() == null) {
            throw new InvalidJobRequest((JobRequest) null, "MantisJobDefinition schedulingInfo cannot be null");
        }
        for (StageSchedulingInfo stageSchedulingInfo : jobDefinition.getSchedulingInfo().getStages().values()) {
            validateConstraints(stageSchedulingInfo.getSoftConstraints(), stageSchedulingInfo.getHardConstraints());
        }
    }

    private void validateConstraints(List<JobConstraints> list, List<JobConstraints> list2) throws InvalidJobRequest {
        if (list != null) {
            for (JobConstraints jobConstraints : list) {
                if (ConstraintsEvaluators.softConstraint(jobConstraints, new HashSet()) == null) {
                    this.logger.error("Invalid Soft Job Constraint {}", jobConstraints);
                    throw new InvalidJobRequest((JobRequest) null, "Unknown constraint " + jobConstraints);
                }
            }
        }
        if (list2 != null) {
            for (JobConstraints jobConstraints2 : list2) {
                if (ConstraintsEvaluators.hardConstraint(jobConstraints2, new HashSet()) == null) {
                    this.logger.error("Invalid Hard Job Constraint {}", jobConstraints2);
                    throw new InvalidJobRequest((JobRequest) null, "Unknown constraint " + jobConstraints2);
                }
            }
        }
    }

    @Override // io.mantisrx.master.jobcluster.IJobClusterManager
    public void onWorkerEvent(WorkerEvent workerEvent) {
        if (this.logger.isTraceEnabled()) {
            this.logger.trace("Enter onWorkerEvent {}", workerEvent);
        }
        Optional<JobInfo> jobInfoForNonTerminalJob = this.jobManager.getJobInfoForNonTerminalJob(workerEvent.getWorkerId().getJobId());
        if (jobInfoForNonTerminalJob.isPresent()) {
            jobInfoForNonTerminalJob.get().jobActor.forward(workerEvent, getContext());
        } else if (JobHelper.isTerminalWorkerEvent(workerEvent)) {
            this.logger.warn("Terminal Event from worker {} has no valid running job. Ignoring event ", workerEvent.getWorkerId());
        } else {
            this.logger.warn("Event from worker {} has no valid running job. Terminating worker ", workerEvent.getWorkerId());
            Optional<String> workerHostFromWorkerEvent = JobHelper.getWorkerHostFromWorkerEvent(workerEvent);
            Optional<IMantisJobMetadata> jobDataForCompletedJob = this.jobManager.getJobDataForCompletedJob(workerEvent.getWorkerId().getJobId());
            if (jobDataForCompletedJob.isPresent()) {
                this.mantisSchedulerFactory.forJob(jobDataForCompletedJob.get().getJobDefinition()).unscheduleAndTerminateWorker(workerEvent.getWorkerId(), workerHostFromWorkerEvent);
            } else {
                this.logger.warn("Non-terminal Event from worker {} has no completed job. Sending event to default cluster", workerEvent.getWorkerId());
                this.mantisSchedulerFactory.forClusterID(null).unscheduleAndTerminateWorker(workerEvent.getWorkerId(), workerHostFromWorkerEvent);
            }
        }
        if (this.logger.isTraceEnabled()) {
            this.logger.trace("Exit onWorkerEvent {}", workerEvent);
        }
    }

    @Override // io.mantisrx.master.jobcluster.IJobClusterManager
    public void onResubmitWorkerRequest(JobClusterManagerProto.ResubmitWorkerRequest resubmitWorkerRequest) {
        if (this.logger.isTraceEnabled()) {
            this.logger.trace("Enter onResubmitWorkerRequest {}", resubmitWorkerRequest);
        }
        onResubmitWorker(resubmitWorkerRequest);
        if (this.logger.isTraceEnabled()) {
            this.logger.trace("Exit onResubmitWorkerRequest {}", resubmitWorkerRequest);
        }
    }

    @Override // io.mantisrx.master.jobcluster.IJobClusterManager
    public void onJobKillRequest(JobClusterProto.KillJobRequest killJobRequest) {
        this.logger.info("JobClusterActor.onKillJobRequest {}", killJobRequest);
        Optional<JobInfo> jobInfoForNonTerminalJob = this.jobManager.getJobInfoForNonTerminalJob(killJobRequest.jobId);
        getSender();
        if (jobInfoForNonTerminalJob.isPresent() && this.jobManager.markJobTerminating(jobInfoForNonTerminalJob.get(), JobState.Failed)) {
            jobInfoForNonTerminalJob.get().jobActor.tell(killJobRequest, getSelf());
            return;
        }
        this.logger.info("Job {} not found", killJobRequest.jobId.getId());
        if (killJobRequest.requestor != null) {
            killJobRequest.requestor.tell(new JobClusterManagerProto.KillJobResponse(killJobRequest.requestId, BaseResponse.ResponseCode.CLIENT_ERROR_NOT_FOUND, JobState.Noop, "Job " + killJobRequest.jobId + " not found", killJobRequest.jobId, killJobRequest.user), getSelf());
        }
    }

    @Override // io.mantisrx.master.jobcluster.IJobClusterManager
    public void onKillJobResponse(JobClusterProto.KillJobResponse killJobResponse) {
        if (this.logger.isTraceEnabled()) {
            this.logger.trace("Enter onKillJobResponse {}", killJobResponse);
        }
        if (killJobResponse.responseCode == BaseResponse.ResponseCode.SUCCESS) {
            Optional<JobInfo> jobInfoForNonTerminalJob = this.jobManager.getJobInfoForNonTerminalJob(killJobResponse.jobId);
            if (jobInfoForNonTerminalJob.isPresent()) {
                getContext().unwatch(jobInfoForNonTerminalJob.get().jobActor);
                this.numJobShutdowns.increment();
                this.logger.info("Marking job {} as terminated", jobInfoForNonTerminalJob.get().jobId);
                if (killJobResponse.requestor != null && !getSelf().equals(killJobResponse.requestor)) {
                    killJobResponse.requestor.tell(new JobClusterManagerProto.KillJobResponse(killJobResponse.requestId, killJobResponse.responseCode, killJobResponse.state, killJobResponse.message, killJobResponse.jobId, killJobResponse.user), getSelf());
                }
                Optional<JobClusterDefinitionImpl.CompletedJob> markCompleted = this.jobManager.markCompleted(killJobResponse.jobId, killJobResponse.jobMetadata, killJobResponse.state);
                if (markCompleted.isPresent()) {
                    this.logger.info("In cleanupAfterJobKill for Job {} in state {} and metadata {} ", new Object[]{killJobResponse.jobId, killJobResponse.state, killJobResponse.jobMetadata});
                    if (!this.jobClusterMetadata.isDisabled()) {
                        SLA sla = this.jobClusterMetadata.getJobClusterDefinition().getSLA();
                        if (sla.getMin() == 0 && sla.getMax() == 0) {
                            this.logger.info("{} No SLA specified nothing to enforce {}", markCompleted.get().getJobId(), sla);
                        } else {
                            try {
                                Optional<IMantisJobMetadata> optional = killJobResponse.jobMetadata;
                                if (optional == null || !optional.isPresent()) {
                                    optional = this.jobStore.getArchivedJob(markCompleted.get().getJobId());
                                }
                                if (optional == null || !optional.isPresent()) {
                                    this.logger.warn("Could not load last terminated job to use for triggering enforce SLA");
                                } else {
                                    getSelf().tell(new JobClusterProto.EnforceSLARequest(Instant.now(), Optional.of(optional.get().getJobDefinition())), ActorRef.noSender());
                                }
                            } catch (Exception e) {
                                this.logger.warn("Exception {} loading completed Job {} to enforce SLA due", new Object[]{e.getMessage(), markCompleted.get().getJobId(), e});
                            }
                        }
                    }
                } else {
                    this.logger.warn("Unable to mark job {} completed. ", killJobResponse.jobId);
                }
            } else if (killJobResponse.requestor != null && !getSelf().equals(killJobResponse.requestor)) {
                killJobResponse.requestor.tell(new JobClusterManagerProto.KillJobResponse(killJobResponse.requestId, BaseResponse.ResponseCode.CLIENT_ERROR, JobState.Noop, "Job not found", killJobResponse.jobId, killJobResponse.user), getSelf());
            }
        } else if (killJobResponse.requestor != null && !getSelf().equals(killJobResponse.requestor)) {
            killJobResponse.requestor.tell(new JobClusterManagerProto.KillJobResponse(killJobResponse.requestId, killJobResponse.responseCode, killJobResponse.state, killJobResponse.message, killJobResponse.jobId, killJobResponse.user), getSelf());
        }
        if (this.logger.isTraceEnabled()) {
            this.logger.trace("Exit onKillJobResponse {}", killJobResponse);
        }
    }

    @Override // io.mantisrx.master.jobcluster.IJobClusterManager
    public void onGetJobDetailsRequest(JobClusterManagerProto.GetJobDetailsRequest getJobDetailsRequest) {
        if (this.logger.isTraceEnabled()) {
            this.logger.trace("Enter GetJobDetails {}", getJobDetailsRequest);
        }
        JobClusterManagerProto.GetJobDetailsResponse getJobDetailsResponse = new JobClusterManagerProto.GetJobDetailsResponse(getJobDetailsRequest.requestId, BaseResponse.ResponseCode.CLIENT_ERROR_NOT_FOUND, "Job " + getJobDetailsRequest.getJobId() + "  not found", Optional.empty());
        Optional<JobInfo> jobInfoForNonTerminalJob = this.jobManager.getJobInfoForNonTerminalJob(getJobDetailsRequest.getJobId());
        if (jobInfoForNonTerminalJob.isPresent()) {
            if (this.logger.isDebugEnabled()) {
                this.logger.debug("Forwarding getJobDetails to job actor for {}", getJobDetailsRequest.getJobId());
            }
            jobInfoForNonTerminalJob.get().jobActor.forward(getJobDetailsRequest, getContext());
            return;
        }
        if (this.jobManager.getCompletedJob(getJobDetailsRequest.getJobId()).isPresent()) {
            if (this.logger.isDebugEnabled()) {
                this.logger.debug("Found Job {} in completed state ", getJobDetailsRequest.getJobId());
            }
            try {
                Optional<IMantisJobMetadata> archivedJob = this.jobStore.getArchivedJob(getJobDetailsRequest.getJobId().getId());
                getJobDetailsResponse = archivedJob.isPresent() ? new JobClusterManagerProto.GetJobDetailsResponse(getJobDetailsRequest.requestId, BaseResponse.ResponseCode.SUCCESS, "", archivedJob) : new JobClusterManagerProto.GetJobDetailsResponse(getJobDetailsRequest.requestId, BaseResponse.ResponseCode.CLIENT_ERROR_NOT_FOUND, "Job " + getJobDetailsRequest.getJobId() + "  not found", Optional.empty());
            } catch (Exception e) {
                this.logger.warn("Exception {} reading Job {} from Storage ", new Object[]{e.getMessage(), getJobDetailsRequest.getJobId(), e});
                getJobDetailsResponse = new JobClusterManagerProto.GetJobDetailsResponse(getJobDetailsRequest.requestId, BaseResponse.ResponseCode.CLIENT_ERROR, "Exception reading Job " + getJobDetailsRequest.getJobId() + "  " + e.getMessage(), Optional.empty());
            }
        } else {
            this.logger.warn("No such job {} ", getJobDetailsRequest.getJobId());
        }
        getSender().tell(getJobDetailsResponse, getSelf());
        if (this.logger.isTraceEnabled()) {
            this.logger.trace("Exit GetJobDetails {}", getJobDetailsRequest);
        }
    }

    @Override // io.mantisrx.master.jobcluster.IJobClusterManager
    public void onGetLatestJobDiscoveryInfo(JobClusterManagerProto.GetLatestJobDiscoveryInfoRequest getLatestJobDiscoveryInfoRequest) {
        if (this.logger.isTraceEnabled()) {
            this.logger.trace("Enter onGetLatestJobDiscoveryInfo {}", getLatestJobDiscoveryInfoRequest);
        }
        ActorRef sender = getSender();
        if (this.name.equals(getLatestJobDiscoveryInfoRequest.getJobCluster())) {
            JobId jobId = (JobId) this.jobIdSubmissionSubject.getValue();
            this.logger.debug("[{}] latest job Id for cluster: {}", this.name, jobId);
            if (jobId != null) {
                Optional<JobInfo> jobInfoForNonTerminalJob = this.jobManager.getJobInfoForNonTerminalJob(jobId);
                if (jobInfoForNonTerminalJob.isPresent()) {
                    jobInfoForNonTerminalJob.get().jobActor.forward(getLatestJobDiscoveryInfoRequest, getContext());
                } else {
                    this.logger.info("job info not found for job ID when looking up discovery info: {}", jobId);
                    sender.tell(new JobClusterManagerProto.GetLatestJobDiscoveryInfoResponse(getLatestJobDiscoveryInfoRequest.requestId, BaseResponse.ResponseCode.SERVER_ERROR, "JobInfo not found when looking up discovery info for " + jobId, Optional.empty()), getSelf());
                }
            } else {
                this.logger.debug("no latest Job ID found for job cluster {}", this.name);
                sender.tell(new JobClusterManagerProto.GetLatestJobDiscoveryInfoResponse(getLatestJobDiscoveryInfoRequest.requestId, BaseResponse.ResponseCode.CLIENT_ERROR_NOT_FOUND, "No latest jobId found for job cluster " + this.name, Optional.empty()), getSelf());
            }
        } else {
            String str = "Job Cluster " + getLatestJobDiscoveryInfoRequest.getJobCluster() + " In request does not match the name of this actor " + this.name;
            this.logger.warn(str);
            sender.tell(new JobClusterManagerProto.GetLatestJobDiscoveryInfoResponse(getLatestJobDiscoveryInfoRequest.requestId, BaseResponse.ResponseCode.SERVER_ERROR, str, Optional.empty()), getSelf());
        }
        if (this.logger.isTraceEnabled()) {
            this.logger.trace("Exit onGetLatestJobDiscoveryInfo {}", getLatestJobDiscoveryInfoRequest);
        }
    }

    @Override // io.mantisrx.master.jobcluster.IJobClusterManager
    public void onGetJobStatusSubject(JobClusterManagerProto.GetJobSchedInfoRequest getJobSchedInfoRequest) {
        if (this.logger.isTraceEnabled()) {
            this.logger.trace("Enter onGetJobStatusSubject {}", getJobSchedInfoRequest);
        }
        Optional<JobInfo> jobInfoForNonTerminalJob = this.jobManager.getJobInfoForNonTerminalJob(getJobSchedInfoRequest.getJobId());
        if (jobInfoForNonTerminalJob.isPresent()) {
            if (this.logger.isDebugEnabled()) {
                this.logger.debug("Forwarding getJobDetails to job actor for {}", getJobSchedInfoRequest.getJobId());
            }
            jobInfoForNonTerminalJob.get().jobActor.forward(getJobSchedInfoRequest, getContext());
        } else {
            getSender().tell(new JobClusterManagerProto.GetJobSchedInfoResponse(getJobSchedInfoRequest.requestId, BaseResponse.ResponseCode.CLIENT_ERROR, "Job " + getJobSchedInfoRequest.getJobId() + "  not found or not active", Optional.empty()), getSelf());
        }
        if (this.logger.isTraceEnabled()) {
            this.logger.trace("Exit onGetJobStatusSubject ");
        }
    }

    @Override // io.mantisrx.master.jobcluster.IJobClusterManager
    public void onGetLastSubmittedJobIdSubject(JobClusterManagerProto.GetLastSubmittedJobIdStreamRequest getLastSubmittedJobIdStreamRequest) {
        if (this.logger.isTraceEnabled()) {
            this.logger.trace("Enter onGetLastSubmittedJobIdSubject {}", getLastSubmittedJobIdStreamRequest);
        }
        ActorRef sender = getSender();
        if (this.name.equals(getLastSubmittedJobIdStreamRequest.getClusterName())) {
            sender.tell(new JobClusterManagerProto.GetLastSubmittedJobIdStreamResponse(getLastSubmittedJobIdStreamRequest.requestId, BaseResponse.ResponseCode.SUCCESS, "", Optional.of(this.jobIdSubmissionSubject)), getSelf());
        } else {
            String str = "Job Cluster " + getLastSubmittedJobIdStreamRequest.getClusterName() + " In request does not match the name of this actor " + this.name;
            this.logger.warn(str);
            sender.tell(new JobClusterManagerProto.GetLastSubmittedJobIdStreamResponse(getLastSubmittedJobIdStreamRequest.requestId, BaseResponse.ResponseCode.CLIENT_ERROR, str, Optional.empty()), getSelf());
        }
        if (this.logger.isTraceEnabled()) {
            this.logger.trace("Exit onGetLastSubmittedJobIdSubject {}", getLastSubmittedJobIdStreamRequest);
        }
    }

    @Override // io.mantisrx.master.jobcluster.IJobClusterManager
    public void onBookkeepingRequest(JobClusterProto.BookkeepingRequest bookkeepingRequest) {
        if (this.logger.isTraceEnabled()) {
            this.logger.trace("Enter onBookkeepingRequest for JobCluster {}", this.name);
        }
        onEnforceSLARequest(new JobClusterProto.EnforceSLARequest());
        this.jobManager.actorToJobIdMap.keySet().forEach(actorRef -> {
            actorRef.tell(new JobProto.MigrateDisabledVmWorkersRequest(bookkeepingRequest.time), ActorRef.noSender());
        });
        if (this.logger.isTraceEnabled()) {
            this.logger.trace("Exit onBookkeepingRequest for JobCluster {}", this.name);
        }
    }

    @Override // io.mantisrx.master.jobcluster.IJobClusterManager
    public void onEnforceSLARequest(JobClusterProto.EnforceSLARequest enforceSLARequest) {
        if (this.logger.isTraceEnabled()) {
            this.logger.trace("Enter onEnforceSLA for JobCluster {} with request", this.name, enforceSLARequest);
        }
        this.numSLAEnforcementExecutions.increment();
        long epochMilli = enforceSLARequest.timeOfEnforcement.toEpochMilli();
        this.jobManager.getJobActorsStuckInInit(epochMilli, getExpirePendingInitializeDelayMs());
        this.jobManager.getJobsStuckInAccepted(epochMilli, getExpireAcceptedDelayMs());
        this.jobManager.getJobsStuckInTerminating(epochMilli, getExpireAcceptedDelayMs());
        if (this.slaEnforcer.hasSLA()) {
            int activeJobsCount = this.jobManager.activeJobsCount();
            int acceptedJobsCount = this.jobManager.acceptedJobsCount();
            int enforceSLAMin = this.slaEnforcer.enforceSLAMin(activeJobsCount, acceptedJobsCount);
            if (enforceSLAMin > 0) {
                this.logger.info("Submitting {} jobs for job name {} as active count is {} and accepted count is {}", new Object[]{Integer.valueOf(enforceSLAMin), this.name, Integer.valueOf(activeJobsCount), Integer.valueOf(acceptedJobsCount)});
                String str = StringConstants.MANTIS_MASTER_USER;
                if (enforceSLARequest.jobDefinitionOp.isPresent()) {
                    str = enforceSLARequest.jobDefinitionOp.get().getUser();
                }
                for (int i = 0; i < enforceSLAMin; i++) {
                    getSelf().tell(new JobClusterManagerProto.SubmitJobRequest(this.name, str, true, enforceSLARequest.jobDefinitionOp), getSelf());
                }
            } else {
                ArrayList arrayList = new ArrayList(activeJobsCount + acceptedJobsCount);
                arrayList.addAll(this.jobManager.getActiveJobsList());
                arrayList.addAll(this.jobManager.getAcceptedJobsList());
                for (JobId jobId : this.slaEnforcer.enforceSLAMax(Collections.unmodifiableList(arrayList))) {
                    this.logger.info("Request termination for job {}", jobId);
                    getSelf().tell(new JobClusterProto.KillJobRequest(jobId, "SLA enforcement", JobCompletedReason.Killed, StringConstants.MANTIS_MASTER_USER, ActorRef.noSender()), getSelf());
                }
            }
            if (this.logger.isTraceEnabled()) {
                this.logger.trace("Exit onEnforceSLA for JobCluster {}", this.name);
            }
        }
    }

    private long getExpireAcceptedDelayMs() {
        return 600000L;
    }

    private Optional<JobDefinition> cloneToNewJobDefinitionWithoutArtifactNameAndVersion(JobDefinition jobDefinition) {
        try {
            return Optional.of(new JobDefinition.Builder().withJobSla(jobDefinition.getJobSla()).withLabels(jobDefinition.getLabels()).withName(jobDefinition.getName()).withParameters(jobDefinition.getParameters()).withSchedulingInfo(jobDefinition.getSchedulingInfo()).withNumberOfStages(jobDefinition.getNumberOfStages()).withSubscriptionTimeoutSecs(jobDefinition.getSubscriptionTimeoutSecs()).withUser(jobDefinition.getUser()).build());
        } catch (Exception e) {
            this.logger.warn("Could not clone JobDefinition {} due to {}", new Object[]{jobDefinition, e.getMessage(), e});
            e.printStackTrace();
            return Optional.empty();
        }
    }

    private Optional<JobDefinition> cloneJobDefinitionForQuickSubmitFromArchivedJobs(List<JobClusterDefinitionImpl.CompletedJob> list, Optional<JobDefinition> optional, MantisJobStore mantisJobStore) {
        if (this.logger.isTraceEnabled()) {
            this.logger.trace("Enter createNewJobDefinitionFromLastSubmittedInheritSchedInfoAndParameters");
        }
        Optional<JobDefinition> lastSubmittedJobDefinition = getLastSubmittedJobDefinition(list, optional, mantisJobStore);
        if (lastSubmittedJobDefinition.isPresent()) {
            if (this.logger.isTraceEnabled()) {
                this.logger.trace("Exit createNewJobDefinitionFromLastSubmittedInheritSchedInfoAndParameters");
            }
            return cloneToNewJobDefinitionWithoutArtifactNameAndVersion(lastSubmittedJobDefinition.get());
        }
        if (this.logger.isTraceEnabled()) {
            this.logger.trace("Exit createNewJobDefinitionFromLastSubmittedInheritSchedInfoAndParameters empty");
        }
        return Optional.empty();
    }

    @Override // io.mantisrx.master.jobcluster.IJobClusterManager
    public void onExpireOldJobs(JobClusterProto.ExpireOldJobsRequest expireOldJobsRequest) {
        this.jobManager.purgeOldCompletedJobs(System.currentTimeMillis() - (getTerminatedJobToDeleteDelayHours() * 3600000));
    }

    private long getExpirePendingInitializeDelayMs() {
        return 60000L;
    }

    @Override // io.mantisrx.master.jobcluster.IJobClusterManager
    public void onTriggerCron(JobClusterProto.TriggerCronRequest triggerCronRequest) {
        if (this.logger.isTraceEnabled()) {
            this.logger.trace("Enter onTriggerCron for Job Cluster {}", this.name);
        }
        if (this.jobClusterMetadata.getJobClusterDefinition().getSLA().getCronPolicy() != null) {
            if (this.jobClusterMetadata.getJobClusterDefinition().getSLA().getCronPolicy() == IJobClusterDefinition.CronPolicy.KEEP_NEW || this.jobManager.getAllNonTerminalJobsList().size() == 0) {
                getSelf().tell(new JobClusterManagerProto.SubmitJobRequest(this.name, StringConstants.MANTIS_MASTER_USER, (Optional<JobDefinition>) Optional.empty(), false), getSelf());
            } else {
                this.logger.info(this.name + ": Skipping submitting new job upon cron trigger, one exists already");
            }
        }
        if (this.logger.isTraceEnabled()) {
            this.logger.trace("Exit onTriggerCron Triggered for Job Cluster {}", this.name);
        }
    }

    private long getTerminatedJobToDeleteDelayHours() {
        return ConfigurationProvider.getConfig().getTerminatedJobToDeleteDelayHours();
    }

    @Override // io.mantisrx.master.jobcluster.IJobClusterManager
    public void onJobClusterUpdateSLA(JobClusterManagerProto.UpdateJobClusterSLARequest updateJobClusterSLARequest) {
        if (this.logger.isTraceEnabled()) {
            this.logger.trace("Enter onJobClusterUpdateSLA {}", updateJobClusterSLARequest);
        }
        ActorRef sender = getSender();
        try {
            SLA sla = new SLA(updateJobClusterSLARequest.getMin(), updateJobClusterSLARequest.getMax(), updateJobClusterSLARequest.getCronSpec(), updateJobClusterSLARequest.getCronPolicy());
            JobClusterDefinitionImpl build = new JobClusterDefinitionImpl.Builder().from(this.jobClusterMetadata.getJobClusterDefinition()).withSla(sla).build();
            boolean isDisabled = this.jobClusterMetadata.isDisabled();
            if (updateJobClusterSLARequest.isForceEnable() && this.jobClusterMetadata.isDisabled()) {
                isDisabled = false;
            }
            updateAndSaveJobCluster(new JobClusterMetadataImpl.Builder().withIsDisabled(isDisabled).withLastJobCount(this.jobClusterMetadata.getLastJobCount()).withJobClusterDefinition(build).build());
            if (this.cronManager != null) {
                this.cronManager.destroyCron();
            }
            this.cronManager = new CronManager(this.name, getSelf(), sla);
            sender.tell(new JobClusterManagerProto.UpdateJobClusterSLAResponse(updateJobClusterSLARequest.requestId, BaseResponse.ResponseCode.SUCCESS, this.name + " SLA updated"), getSelf());
            this.eventPublisher.publishAuditEvent(new LifecycleEventsProto.AuditEvent(LifecycleEventsProto.AuditEvent.AuditEventType.JOB_CLUSTER_UPDATE, this.jobClusterMetadata.getJobClusterDefinition().getName(), this.name + " SLA update"));
        } catch (IllegalArgumentException e) {
            this.logger.error("Invalid arguement job cluster not updated ", e);
            sender.tell(new JobClusterManagerProto.UpdateJobClusterSLAResponse(updateJobClusterSLARequest.requestId, BaseResponse.ResponseCode.CLIENT_ERROR, this.name + " Job cluster SLA updation failed " + e.getMessage()), getSelf());
        } catch (Exception e2) {
            this.logger.error("job cluster not updated ", e2);
            sender.tell(new JobClusterManagerProto.UpdateJobClusterSLAResponse(updateJobClusterSLARequest.requestId, BaseResponse.ResponseCode.SERVER_ERROR, this.name + " Job cluster SLA updation failed " + e2.getMessage()), getSelf());
        }
        if (this.logger.isTraceEnabled()) {
            this.logger.trace("Exit onJobClusterUpdateSLA {}", updateJobClusterSLARequest);
        }
    }

    @Override // io.mantisrx.master.jobcluster.IJobClusterManager
    public void onJobClusterUpdateLabels(JobClusterManagerProto.UpdateJobClusterLabelsRequest updateJobClusterLabelsRequest) {
        if (this.logger.isTraceEnabled()) {
            this.logger.trace("Enter onJobClusterUpdateLabels {}", updateJobClusterLabelsRequest);
        }
        ActorRef sender = getSender();
        try {
            updateAndSaveJobCluster(new JobClusterMetadataImpl.Builder().withIsDisabled(this.jobClusterMetadata.isDisabled()).withLastJobCount(this.jobClusterMetadata.getLastJobCount()).withJobClusterDefinition(new JobClusterDefinitionImpl.Builder().from(this.jobClusterMetadata.getJobClusterDefinition()).withJobClusterConfig(new JobClusterConfig.Builder().from(this.jobClusterMetadata.getJobClusterDefinition().getJobClusterConfig()).build()).withLabels(updateJobClusterLabelsRequest.getLabels()).build()).build());
            sender.tell(new JobClusterManagerProto.UpdateJobClusterLabelsResponse(updateJobClusterLabelsRequest.requestId, BaseResponse.ResponseCode.SUCCESS, this.name + " labels updated"), getSelf());
            this.eventPublisher.publishAuditEvent(new LifecycleEventsProto.AuditEvent(LifecycleEventsProto.AuditEvent.AuditEventType.JOB_CLUSTER_UPDATE, this.jobClusterMetadata.getJobClusterDefinition().getName(), this.name + " update labels"));
        } catch (Exception e) {
            this.logger.error("job cluster labels not updated ", e);
            sender.tell(new JobClusterManagerProto.UpdateJobClusterLabelsResponse(updateJobClusterLabelsRequest.requestId, BaseResponse.ResponseCode.SERVER_ERROR, this.name + " labels updation failed " + e.getMessage()), getSelf());
        }
        if (this.logger.isTraceEnabled()) {
            this.logger.trace("Exit onJobClusterUpdateLabels {}", updateJobClusterLabelsRequest);
        }
    }

    @Override // io.mantisrx.master.jobcluster.IJobClusterManager
    public void onJobClusterUpdateArtifact(JobClusterManagerProto.UpdateJobClusterArtifactRequest updateJobClusterArtifactRequest) {
        if (this.logger.isTraceEnabled()) {
            this.logger.trace("Entering JobClusterActor:onJobClusterUpdateArtifact");
        }
        ActorRef sender = getSender();
        try {
        } catch (Exception e) {
            this.logger.error("job cluster not updated ", e);
            sender.tell(new JobClusterManagerProto.UpdateJobClusterArtifactResponse(updateJobClusterArtifactRequest.requestId, BaseResponse.ResponseCode.SERVER_ERROR, this.name + " Job cluster artifact updation failed " + e.getMessage()), getSelf());
        }
        if (!isVersionUnique(updateJobClusterArtifactRequest.getVersion(), this.jobClusterMetadata.getJobClusterDefinition().getJobClusterConfigs())) {
            String format = String.format("job cluster %s not updated as the version %s is not unique", this.name, updateJobClusterArtifactRequest.getVersion());
            this.logger.error(format);
            sender.tell(new JobClusterManagerProto.UpdateJobClusterArtifactResponse(updateJobClusterArtifactRequest.requestId, BaseResponse.ResponseCode.CLIENT_ERROR, format), getSelf());
            return;
        }
        updateJobClusterConfig(new JobClusterConfig.Builder().from(this.jobClusterMetadata.getJobClusterDefinition().getJobClusterConfig()).withArtifactName(updateJobClusterArtifactRequest.getArtifactName()).withVersion(updateJobClusterArtifactRequest.getVersion()).withUploadedAt(System.currentTimeMillis()).build());
        if (!updateJobClusterArtifactRequest.isSkipSubmit()) {
            getSelf().tell(new JobClusterManagerProto.SubmitJobRequest(this.name, updateJobClusterArtifactRequest.getUser(), (Optional<JobDefinition>) Optional.empty(), false), getSelf());
        }
        sender.tell(new JobClusterManagerProto.UpdateJobClusterArtifactResponse(updateJobClusterArtifactRequest.requestId, BaseResponse.ResponseCode.SUCCESS, this.name + " artifact updated"), getSelf());
        if (this.logger.isTraceEnabled()) {
            this.logger.trace("Exit JobClusterActor:onJobClusterUpdateArtifact");
        }
    }

    private void updateJobClusterConfig(JobClusterConfig jobClusterConfig) throws Exception {
        updateAndSaveJobCluster(new JobClusterMetadataImpl.Builder().withIsDisabled(this.jobClusterMetadata.isDisabled()).withLastJobCount(this.jobClusterMetadata.getLastJobCount()).withJobClusterDefinition(new JobClusterDefinitionImpl.Builder().from(this.jobClusterMetadata.getJobClusterDefinition()).withJobClusterConfig(jobClusterConfig).build()).build());
        this.eventPublisher.publishAuditEvent(new LifecycleEventsProto.AuditEvent(LifecycleEventsProto.AuditEvent.AuditEventType.JOB_CLUSTER_UPDATE, this.jobClusterMetadata.getJobClusterDefinition().getName(), this.name + " artifact update"));
    }

    @Override // io.mantisrx.master.jobcluster.IJobClusterManager
    public void onJobClusterUpdateSchedulingInfo(JobClustersManagerActor.UpdateSchedulingInfo updateSchedulingInfo) {
        ActorRef sender = getSender();
        try {
            if (isVersionUnique(updateSchedulingInfo.getVersion(), this.jobClusterMetadata.getJobClusterDefinition().getJobClusterConfigs())) {
                updateJobClusterConfig(new JobClusterConfig.Builder().from(this.jobClusterMetadata.getJobClusterDefinition().getJobClusterConfig()).withVersion(updateSchedulingInfo.getVersion()).withSchedulingInfo(updateSchedulingInfo.getSchedulingInfo()).withUploadedAt(System.currentTimeMillis()).build());
                sender.tell(new JobClusterManagerProto.UpdateSchedulingInfoResponse(updateSchedulingInfo.getRequestId(), BaseResponse.ResponseCode.SUCCESS, this.name + " schedulingInfo updated"), getSelf());
            } else {
                String format = String.format("job cluster %s not updated as the version %s is not unique", this.name, updateSchedulingInfo.getVersion());
                this.logger.error(format);
                sender.tell(new JobClusterManagerProto.UpdateSchedulingInfoResponse(updateSchedulingInfo.getRequestId(), BaseResponse.ResponseCode.CLIENT_ERROR, format), getSelf());
            }
        } catch (Exception e) {
            this.logger.error("job cluster not updated ", e);
            sender.tell(new JobClusterManagerProto.UpdateSchedulingInfoResponse(updateSchedulingInfo.getRequestId(), BaseResponse.ResponseCode.SERVER_ERROR, this.name + " Job cluster schedulingInfo update failed " + e.getMessage()), getSelf());
        }
    }

    boolean isVersionUnique(String str, List<JobClusterConfig> list) {
        if (this.logger.isTraceEnabled()) {
            this.logger.trace("Enter JobClusterActor {} isVersionnique {} existing versions {}", new Object[]{this.name, str, list});
        }
        Iterator<JobClusterConfig> it = list.iterator();
        while (it.hasNext()) {
            if (it.next().getVersion().equals(str)) {
                this.logger.info("Given Version {} is not unique during UpdateJobCluster {}", str, this.name);
                return false;
            }
        }
        return true;
    }

    @Override // io.mantisrx.master.jobcluster.IJobClusterManager
    public void onJobClusterUpdateWorkerMigrationConfig(JobClusterManagerProto.UpdateJobClusterWorkerMigrationStrategyRequest updateJobClusterWorkerMigrationStrategyRequest) {
        if (this.logger.isTraceEnabled()) {
            this.logger.trace("Entering JobClusterActor:onJobClusterUpdateWorkerMigrationConfig {}", updateJobClusterWorkerMigrationStrategyRequest);
        }
        ActorRef sender = getSender();
        try {
            updateAndSaveJobCluster(new JobClusterMetadataImpl.Builder().withIsDisabled(this.jobClusterMetadata.isDisabled()).withLastJobCount(this.jobClusterMetadata.getLastJobCount()).withJobClusterDefinition(new JobClusterDefinitionImpl.Builder().from(this.jobClusterMetadata.getJobClusterDefinition()).withMigrationConfig(updateJobClusterWorkerMigrationStrategyRequest.getMigrationConfig()).build()).build());
            sender.tell(new JobClusterManagerProto.UpdateJobClusterWorkerMigrationStrategyResponse(updateJobClusterWorkerMigrationStrategyRequest.requestId, BaseResponse.ResponseCode.SUCCESS, this.name + " worker migration config updated"), getSelf());
            this.eventPublisher.publishAuditEvent(new LifecycleEventsProto.AuditEvent(LifecycleEventsProto.AuditEvent.AuditEventType.JOB_CLUSTER_UPDATE, this.jobClusterMetadata.getJobClusterDefinition().getName(), this.name + " worker migration config update"));
        } catch (Exception e) {
            this.logger.error("job cluster migration config not updated ", e);
            sender.tell(new JobClusterManagerProto.UpdateJobClusterWorkerMigrationStrategyResponse(updateJobClusterWorkerMigrationStrategyRequest.requestId, BaseResponse.ResponseCode.SERVER_ERROR, this.name + " Job cluster worker migration config updation failed " + e.getMessage()), getSelf());
        }
        if (this.logger.isTraceEnabled()) {
            this.logger.trace("Exit JobClusterActor:onJobClusterUpdateWorkerMigrationConfig {}", updateJobClusterWorkerMigrationStrategyRequest);
        }
    }

    private void updateAndSaveJobCluster(IJobClusterMetadata iJobClusterMetadata) throws Exception {
        if (this.logger.isTraceEnabled()) {
            this.logger.trace("Entering JobClusterActor:updateAndSaveJobCluster {}", iJobClusterMetadata.getJobClusterDefinition().getName());
        }
        this.jobStore.updateJobCluster(iJobClusterMetadata);
        this.jobClusterMetadata = iJobClusterMetadata;
        if (!this.jobClusterMetadata.isDisabled()) {
            getContext().become(this.initializedBehavior);
        }
        this.slaEnforcer = new SLAEnforcer(this.jobClusterMetadata.getJobClusterDefinition().getSLA());
        this.logger.info("successfully saved job cluster");
        if (this.logger.isTraceEnabled()) {
            this.logger.trace("Exit JobClusterActor:updateAndSaveJobCluster {}", iJobClusterMetadata.getJobClusterDefinition().getName());
        }
    }

    private Optional<JobDefinition> getLastSubmittedJobDefinition(List<JobClusterDefinitionImpl.CompletedJob> list, Optional<JobDefinition> optional, MantisJobStore mantisJobStore) {
        if (this.logger.isTraceEnabled()) {
            this.logger.trace("Entering getLastSubmittedJobDefinition");
        }
        if (optional.isPresent()) {
            return optional;
        }
        Optional<JobId> lastSubmittedJobId = JobListHelper.getLastSubmittedJobId(Collections.emptyList(), list);
        if (lastSubmittedJobId.isPresent()) {
            Optional<JobClusterDefinitionImpl.CompletedJob> completedJob = this.jobManager.getCompletedJob(lastSubmittedJobId.get());
            if (completedJob.isPresent()) {
                try {
                    Optional<IMantisJobMetadata> archivedJob = mantisJobStore.getArchivedJob(completedJob.get().getJobId());
                    if (archivedJob.isPresent()) {
                        if (this.logger.isTraceEnabled()) {
                            this.logger.trace("Exit getLastSubmittedJobDefinition returning job {} with defn {}", archivedJob.get().getJobId(), archivedJob.get().getJobDefinition());
                        }
                        return Optional.of(archivedJob.get().getJobDefinition());
                    }
                    this.logger.warn("Could not find load archived Job {} for cluster {}", completedJob.get().getJobId(), this.name);
                } catch (Exception e) {
                    this.logger.warn("Archived Job {} could not be loaded from the store due to {} ", completedJob.get().getJobId(), e.getMessage());
                }
            } else {
                this.logger.warn("Could not find any previous submitted/completed Job for cluster {}", this.name);
            }
        } else {
            this.logger.warn("Could not find any previous submitted Job for cluster {}", this.name);
        }
        if (this.logger.isTraceEnabled()) {
            this.logger.trace("Exit getLastSubmittedJobDefinition empty");
        }
        return Optional.empty();
    }

    private void onTerminated(Terminated terminated) {
        if (this.logger.isDebugEnabled()) {
            this.logger.debug("onTerminatedEvent {} ", terminated);
        }
    }

    @Override // io.mantisrx.master.jobcluster.IJobClusterManager
    public void onScaleStage(JobClusterManagerProto.ScaleStageRequest scaleStageRequest) {
        if (this.logger.isTraceEnabled()) {
            this.logger.trace("Exit onScaleStage {}", scaleStageRequest);
        }
        Optional<JobInfo> jobInfoForNonTerminalJob = this.jobManager.getJobInfoForNonTerminalJob(scaleStageRequest.getJobId());
        ActorRef sender = getSender();
        if (jobInfoForNonTerminalJob.isPresent()) {
            jobInfoForNonTerminalJob.get().jobActor.forward(scaleStageRequest, getContext());
        } else {
            sender.tell(new JobClusterManagerProto.ScaleStageResponse(scaleStageRequest.requestId, BaseResponse.ResponseCode.CLIENT_ERROR, "Job " + scaleStageRequest.getJobId() + " not found. Could not scale stage to " + scaleStageRequest.getNumWorkers(), 0), getSelf());
        }
        if (this.logger.isTraceEnabled()) {
            this.logger.trace("Exit onScaleStage {}", scaleStageRequest);
        }
    }

    @Override // io.mantisrx.master.jobcluster.IJobClusterManager
    public void onResubmitWorker(JobClusterManagerProto.ResubmitWorkerRequest resubmitWorkerRequest) {
        if (this.logger.isTraceEnabled()) {
            this.logger.trace("Exit JCA:onResubmitWorker {}", resubmitWorkerRequest);
        }
        Optional<JobInfo> jobInfoForNonTerminalJob = this.jobManager.getJobInfoForNonTerminalJob(resubmitWorkerRequest.getJobId());
        ActorRef sender = getSender();
        if (jobInfoForNonTerminalJob.isPresent()) {
            jobInfoForNonTerminalJob.get().jobActor.forward(resubmitWorkerRequest, getContext());
        } else {
            sender.tell(new JobClusterManagerProto.ResubmitWorkerResponse(resubmitWorkerRequest.requestId, BaseResponse.ResponseCode.CLIENT_ERROR, "Job " + resubmitWorkerRequest.getJobId() + " not found. Could not resubmit worker"), getSelf());
        }
        if (this.logger.isTraceEnabled()) {
            this.logger.trace("Exit JCA:onResubmitWorker {}", resubmitWorkerRequest);
        }
    }
}
