package io.mantisrx.server.worker;

import io.mantisrx.common.WorkerPorts;
import io.mantisrx.runtime.MachineDefinition;
import io.mantisrx.runtime.MachineDefinitions;
import io.mantisrx.runtime.MantisJobDurationType;
import io.mantisrx.runtime.descriptor.SchedulingInfo;
import io.mantisrx.runtime.descriptor.StageScalingPolicy;
import io.mantisrx.runtime.descriptor.StageSchedulingInfo;
import io.mantisrx.runtime.parameter.Parameter;
import io.mantisrx.server.core.BaseService;
import io.mantisrx.server.core.ExecuteStageRequest;
import io.mantisrx.server.core.WorkerTopologyInfo;
import io.mantisrx.server.core.WrappedExecuteStageRequest;
import io.mantisrx.server.worker.mesos.VirtualMachineTaskStatus;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import org.apache.mesos.MesosExecutorDriver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.Observer;
import rx.functions.Action0;
import rx.functions.Action1;
import rx.schedulers.Schedulers;
import rx.subjects.PublishSubject;

/* loaded from: input_file:io/mantisrx/server/worker/VirtualMachineWorkerServiceLocalImpl.class */
public class VirtualMachineWorkerServiceLocalImpl extends BaseService implements VirtualMachineWorkerService {
    private static final Logger logger = LoggerFactory.getLogger(VirtualMachineWorkerServiceLocalImpl.class);
    private final WorkerTopologyInfo.Data workerInfo;
    private MesosExecutorDriver mesosDriver;
    private ExecutorService executor = Executors.newSingleThreadExecutor(new ThreadFactory() { // from class: io.mantisrx.server.worker.VirtualMachineWorkerServiceLocalImpl.1
        @Override // java.util.concurrent.ThreadFactory
        public Thread newThread(Runnable runnable) {
            Thread thread = new Thread(runnable, "vm_worker_mesos_executor_thread");
            thread.setDaemon(true);
            return thread;
        }
    });
    private Observer<WrappedExecuteStageRequest> executeStageRequestObserver;
    private Observable<VirtualMachineTaskStatus> vmTaskStatusObservable;

    public VirtualMachineWorkerServiceLocalImpl(WorkerTopologyInfo.Data data, Observer<WrappedExecuteStageRequest> observer, Observable<VirtualMachineTaskStatus> observable) {
        this.workerInfo = data;
        this.executeStageRequestObserver = observer;
        this.vmTaskStatusObservable = observable;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public WrappedExecuteStageRequest createExecuteStageRequest() throws MalformedURLException {
        URL url = new URL("file:/Users/nmahilani/Projects/Mantis/mantis-sdk/examples/sine-function/build/distributions/sine-function-1.0.zip");
        List asList = Arrays.asList(31015, 31013, 31014);
        List singletonList = Collections.singletonList(new Parameter("useRandom", "true"));
        HashMap hashMap = new HashMap();
        StageSchedulingInfo.builder().numberOfInstances(1).machineDefinition(MachineDefinitions.micro()).build();
        hashMap.put(1, StageSchedulingInfo.builder().numberOfInstances(1).machineDefinition(new MachineDefinition(2.0d, 300.0d, 200.0d, 1024.0d, 2)).scalingPolicy(new StageScalingPolicy(1, 1, 5, 1, 1, 30L, Collections.singletonMap(StageScalingPolicy.ScalingReason.Memory, new StageScalingPolicy.Strategy(StageScalingPolicy.ScalingReason.Memory, 15.0d, 25.0d, new StageScalingPolicy.RollingCount(1, 2))))).scalable(true).build());
        return new WrappedExecuteStageRequest(PublishSubject.create(), new ExecuteStageRequest(this.workerInfo.getJobName(), this.workerInfo.getJobId(), this.workerInfo.getWorkerIndex(), this.workerInfo.getWorkerNumber(), url, this.workerInfo.getStageNumber(), this.workerInfo.getNumStages(), asList, 5L, this.workerInfo.getMetricsPort(), singletonList, new SchedulingInfo(hashMap), MantisJobDurationType.Transient, 0L, 0L, 0L, new WorkerPorts(Arrays.asList(7151, 7152, 7153, 7154, 7155)), Optional.empty()));
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void setupRequestFailureHandler(long j, Observable<Boolean> observable, final Action0 action0) {
        observable.buffer(j, TimeUnit.SECONDS, 1).take(1).subscribe(new Observer<List<Boolean>>() { // from class: io.mantisrx.server.worker.VirtualMachineWorkerServiceLocalImpl.2
            public void onCompleted() {
            }

            public void onError(Throwable th) {
                VirtualMachineWorkerServiceLocalImpl.logger.error("onError called for request failure handler");
                action0.call();
            }

            public void onNext(List<Boolean> list) {
                VirtualMachineWorkerServiceLocalImpl.logger.info("onNext called for request failure handler with items: " + (list == null ? "-1" : Integer.valueOf(list.size())));
                if (list == null || list.isEmpty()) {
                    action0.call();
                }
            }
        });
    }

    public void start() {
        logger.info("Starting VirtualMachineWorkerServiceLocalImpl");
        Schedulers.newThread().createWorker().schedule(new Action0() { // from class: io.mantisrx.server.worker.VirtualMachineWorkerServiceLocalImpl.3
            public void call() {
                try {
                    WrappedExecuteStageRequest createExecuteStageRequest = VirtualMachineWorkerServiceLocalImpl.this.createExecuteStageRequest();
                    VirtualMachineWorkerServiceLocalImpl.this.setupRequestFailureHandler(createExecuteStageRequest.getRequest().getTimeoutToReportStart(), createExecuteStageRequest.getRequestSubject(), new Action0() { // from class: io.mantisrx.server.worker.VirtualMachineWorkerServiceLocalImpl.3.1
                        public void call() {
                            VirtualMachineWorkerServiceLocalImpl.logger.error("launch error");
                        }
                    });
                    VirtualMachineWorkerServiceLocalImpl.logger.info("onNext'ing WrappedExecuteStageRequest: {}", createExecuteStageRequest.toString());
                    VirtualMachineWorkerServiceLocalImpl.this.executeStageRequestObserver.onNext(createExecuteStageRequest);
                } catch (MalformedURLException e) {
                    e.printStackTrace();
                }
            }
        }, 2L, TimeUnit.SECONDS);
        this.vmTaskStatusObservable.subscribe(new Action1<VirtualMachineTaskStatus>() { // from class: io.mantisrx.server.worker.VirtualMachineWorkerServiceLocalImpl.4
            public void call(VirtualMachineTaskStatus virtualMachineTaskStatus) {
                VirtualMachineTaskStatus.TYPE type = virtualMachineTaskStatus.getType();
                if (type == VirtualMachineTaskStatus.TYPE.COMPLETED) {
                    VirtualMachineWorkerServiceLocalImpl.logger.info("Got COMPLETED state for " + virtualMachineTaskStatus.getTaskId());
                } else if (type == VirtualMachineTaskStatus.TYPE.STARTED) {
                    VirtualMachineWorkerServiceLocalImpl.logger.info("Would send RUNNING state to mesos, worker started for " + virtualMachineTaskStatus.getTaskId());
                }
            }
        });
    }

    public void shutdown() {
        logger.info("Unregistering Mantis Worker with Mesos executor callbacks");
        this.mesosDriver.stop();
        this.executor.shutdown();
    }

    public void enterActiveMode() {
    }
}
