package io.mantisrx.server.worker;

import io.mantisrx.runtime.Job;
import io.mantisrx.runtime.MantisJobProvider;
import io.mantisrx.server.core.BaseService;
import io.mantisrx.server.core.ExecuteStageRequest;
import io.mantisrx.server.core.Status;
import java.net.MalformedURLException;
import java.net.URL;
import java.net.URLClassLoader;
import java.nio.file.Paths;
import java.util.Optional;
import java.util.ServiceLoader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.Observer;
import rx.Subscription;
import rx.functions.Func1;
import rx.subjects.PublishSubject;

/* loaded from: input_file:io/mantisrx/server/worker/ExecuteStageRequestService.class */
public class ExecuteStageRequestService extends BaseService {
    private static final Logger logger = LoggerFactory.getLogger(ExecuteStageRequestService.class);
    private Observable<WrappedExecuteStageRequest> executeStageRequestObservable;
    private Observer<Observable<Status>> tasksStatusObserver;
    private WorkerExecutionOperations executionOperations;
    private Optional<String> jobProviderClass;
    private Subscription subscription;

    public ExecuteStageRequestService(Observable<WrappedExecuteStageRequest> observable, Observer<Observable<Status>> observer, WorkerExecutionOperations workerExecutionOperations, Optional<String> optional) {
        this.executeStageRequestObservable = observable;
        this.tasksStatusObserver = observer;
        this.executionOperations = workerExecutionOperations;
        this.jobProviderClass = optional;
    }

    public void start() {
        this.subscription = this.executeStageRequestObservable.map(new Func1<WrappedExecuteStageRequest, TrackedExecuteStageRequest>() { // from class: io.mantisrx.server.worker.ExecuteStageRequestService.3
            public TrackedExecuteStageRequest call(WrappedExecuteStageRequest wrappedExecuteStageRequest) {
                PublishSubject create = PublishSubject.create();
                ExecuteStageRequestService.this.tasksStatusObserver.onNext(create);
                return new TrackedExecuteStageRequest(wrappedExecuteStageRequest, create);
            }
        }).flatMap(new Func1<TrackedExecuteStageRequest, Observable<ExecutionDetails>>() { // from class: io.mantisrx.server.worker.ExecuteStageRequestService.2
            public Observable<ExecutionDetails> call(TrackedExecuteStageRequest trackedExecuteStageRequest) {
                Job jobInstance;
                ExecuteStageRequest request = trackedExecuteStageRequest.getExecuteRequest().getRequest();
                String file = request.getJobJarUrl().getFile();
                file.substring(file.lastIndexOf(47) + 1);
                try {
                    URL url = Paths.get(Paths.get("/tmp", "mantis-jobs", request.getJobId(), Integer.toString(request.getWorkerNumber()), "libs").toString(), "*").toUri().toURL();
                    ExecuteStageRequestService.logger.info("Creating job classpath with pathLocation " + url);
                    URLClassLoader newInstance = URLClassLoader.newInstance(new URL[]{url});
                    try {
                        if (ExecuteStageRequestService.this.jobProviderClass.isPresent()) {
                            ExecuteStageRequestService.logger.info("loading job main class " + ((String) ExecuteStageRequestService.this.jobProviderClass.get()));
                            jobInstance = ((MantisJobProvider) Class.forName((String) ExecuteStageRequestService.this.jobProviderClass.get()).newInstance()).getJobInstance();
                        } else {
                            ExecuteStageRequestService.logger.info("using serviceLoader to get job instance");
                            jobInstance = ((MantisJobProvider) ServiceLoader.load(MantisJobProvider.class, newInstance).iterator().next()).getJobInstance();
                        }
                        ExecuteStageRequestService.logger.info("Executing job");
                        return Observable.just(new ExecutionDetails(trackedExecuteStageRequest.getExecuteRequest(), trackedExecuteStageRequest.getStatus(), jobInstance, newInstance, request.getParameters()));
                    } catch (Throwable th) {
                        ExecuteStageRequestService.logger.error("Failed to load job class", th);
                        trackedExecuteStageRequest.getStatus().onError(th);
                        return Observable.empty();
                    }
                } catch (MalformedURLException e) {
                    ExecuteStageRequestService.logger.error("Failed to convert path location to URL", e);
                    trackedExecuteStageRequest.getStatus().onError(e);
                    return Observable.empty();
                }
            }
        }).subscribe(new Observer<ExecutionDetails>() { // from class: io.mantisrx.server.worker.ExecuteStageRequestService.1
            public void onCompleted() {
                ExecuteStageRequestService.logger.error("Execute stage observable completed");
                ExecuteStageRequestService.this.executionOperations.shutdownStage();
            }

            public void onError(Throwable th) {
                ExecuteStageRequestService.logger.error("Execute stage observable threw exception", th);
            }

            public void onNext(final ExecutionDetails executionDetails) {
                ExecuteStageRequestService.logger.info("Executing stage for job ID: " + executionDetails.getExecuteStageRequest().getRequest().getJobId());
                Thread thread = new Thread("Mantis Worker Thread for " + executionDetails.getExecuteStageRequest().getRequest().getJobId()) { // from class: io.mantisrx.server.worker.ExecuteStageRequestService.1.1
                    @Override // java.lang.Thread, java.lang.Runnable
                    public void run() {
                        try {
                            ExecuteStageRequestService.this.executionOperations.executeStage(executionDetails);
                        } catch (Throwable th) {
                            ExecuteStageRequestService.logger.error("Failed to execute job stage", th);
                        }
                    }
                };
                thread.setContextClassLoader(executionDetails.getClassLoader());
                thread.setDaemon(true);
                thread.start();
            }
        });
    }

    public void shutdown() {
        this.subscription.unsubscribe();
    }

    public void enterActiveMode() {
    }
}
