package io.mantisrx.server.worker;

import io.mantisrx.common.JsonSerializer;
import io.mantisrx.runtime.Job;
import io.mantisrx.runtime.loader.RuntimeTask;
import io.mantisrx.runtime.loader.SinkSubscriptionStateHandler;
import io.mantisrx.runtime.loader.config.WorkerConfiguration;
import io.mantisrx.runtime.loader.config.WorkerConfigurationUtils;
import io.mantisrx.runtime.loader.config.WorkerConfigurationWritable;
import io.mantisrx.server.agent.metrics.cgroups.CgroupsMetricsCollector;
import io.mantisrx.server.core.ExecuteStageRequest;
import io.mantisrx.server.core.Service;
import io.mantisrx.server.core.Status;
import io.mantisrx.server.core.WrappedExecuteStageRequest;
import io.mantisrx.server.core.metrics.MetricsFactory;
import io.mantisrx.server.master.client.HighAvailabilityServices;
import io.mantisrx.server.master.client.HighAvailabilityServicesUtil;
import io.mantisrx.server.master.client.MantisMasterGateway;
import io.mantisrx.server.master.client.TaskStatusUpdateHandler;
import io.mantisrx.server.worker.client.WorkerMetricsClient;
import io.mantisrx.server.worker.mesos.VirtualMachineTaskStatus;
import io.mantisrx.shaded.com.google.common.util.concurrent.AbstractIdleService;
import java.io.IOException;
import java.time.Clock;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import org.apache.flink.util.UserCodeClassLoader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.schedulers.Schedulers;
import rx.subjects.PublishSubject;

/* loaded from: input_file:io/mantisrx/server/worker/RuntimeTaskImpl.class */
public class RuntimeTaskImpl extends AbstractIdleService implements RuntimeTask {
    private static final Logger log = LoggerFactory.getLogger(RuntimeTaskImpl.class);
    private WrappedExecuteStageRequest wrappedExecuteStageRequest;
    private WorkerConfiguration config;
    private final List<Service> mantisServices;
    private HighAvailabilityServices highAvailabilityServices;
    private TaskStatusUpdateHandler taskStatusUpdateHandler;
    private MantisMasterGateway masterMonitor;
    private UserCodeClassLoader userCodeClassLoader;
    private SinkSubscriptionStateHandler.Factory sinkSubscriptionStateHandlerFactory;
    private final PublishSubject<Observable<Status>> tasksStatusSubject;
    private final PublishSubject<VirtualMachineTaskStatus> vmTaskStatusSubject;
    private Optional<Job> mantisJob;
    private ExecuteStageRequest executeStageRequest;

    public RuntimeTaskImpl() {
        this.mantisServices = new ArrayList();
        this.vmTaskStatusSubject = PublishSubject.create();
        this.mantisJob = Optional.empty();
        this.tasksStatusSubject = PublishSubject.create();
    }

    public RuntimeTaskImpl(PublishSubject<Observable<Status>> publishSubject) {
        this.mantisServices = new ArrayList();
        this.vmTaskStatusSubject = PublishSubject.create();
        this.mantisJob = Optional.empty();
        this.tasksStatusSubject = publishSubject;
    }

    public void initialize(String str, String str2, UserCodeClassLoader userCodeClassLoader) {
        try {
            log.info("Creating runtimeTaskImpl.");
            log.info("runtimeTaskImpl workerConfigurationString: {}", str2);
            log.info("runtimeTaskImpl executeStageRequestString: {}", str);
            JsonSerializer jsonSerializer = new JsonSerializer();
            WorkerConfigurationWritable stringToWorkerConfiguration = WorkerConfigurationUtils.stringToWorkerConfiguration(str2);
            this.config = stringToWorkerConfiguration;
            this.wrappedExecuteStageRequest = new WrappedExecuteStageRequest(PublishSubject.create(), (ExecuteStageRequest) jsonSerializer.fromJSON(str, ExecuteStageRequest.class));
            log.info("Picking Cgroups metrics collector.");
            stringToWorkerConfiguration.setMetricsCollector(CgroupsMetricsCollector.valueOf(System.getProperties()));
            this.highAvailabilityServices = HighAvailabilityServicesUtil.createHAServices(this.config);
            this.executeStageRequest = this.wrappedExecuteStageRequest.getRequest();
            this.masterMonitor = this.highAvailabilityServices.getMasterClientApi();
            this.userCodeClassLoader = userCodeClassLoader;
            this.sinkSubscriptionStateHandlerFactory = SinkSubscriptionStateHandler.Factory.forEphemeralJobsThatNeedToBeKilledInAbsenceOfSubscriber(this.highAvailabilityServices.getMasterClientApi(), Clock.systemDefaultZone());
            this.taskStatusUpdateHandler = TaskStatusUpdateHandler.forReportingToGateway(this.masterMonitor);
            getStatus().observeOn(Schedulers.io()).subscribe(status -> {
                this.taskStatusUpdateHandler.onStatusUpdate(status);
            });
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void initialize(WrappedExecuteStageRequest wrappedExecuteStageRequest, WorkerConfiguration workerConfiguration, MantisMasterGateway mantisMasterGateway, UserCodeClassLoader userCodeClassLoader, SinkSubscriptionStateHandler.Factory factory) {
        log.info("initialize RuntimeTaskImpl on injected ExecuteStageRequest: {}", wrappedExecuteStageRequest.getRequest());
        this.wrappedExecuteStageRequest = wrappedExecuteStageRequest;
        this.executeStageRequest = wrappedExecuteStageRequest.getRequest();
        this.config = workerConfiguration;
        this.masterMonitor = mantisMasterGateway;
        this.userCodeClassLoader = userCodeClassLoader;
        this.sinkSubscriptionStateHandlerFactory = factory;
        this.taskStatusUpdateHandler = TaskStatusUpdateHandler.forReportingToGateway(mantisMasterGateway);
        getStatus().observeOn(Schedulers.io()).subscribe(status -> {
            this.taskStatusUpdateHandler.onStatusUpdate(status);
        });
    }

    public void setJob(Optional<Job> optional) {
        this.mantisJob = optional;
    }

    protected void startUp() throws Exception {
        try {
            log.info("Starting current task {}", this);
            if (this.highAvailabilityServices != null && !this.highAvailabilityServices.isRunning()) {
                this.highAvailabilityServices.startAsync().awaitRunning();
            }
            doRun();
        } catch (Exception e) {
            log.error("Failed executing the task {}", this.executeStageRequest, e);
            throw e;
        }
    }

    private void doRun() throws Exception {
        PublishSubject create = PublishSubject.create();
        this.mantisServices.add(MetricsFactory.newMetricsServer(this.config, this.executeStageRequest));
        this.mantisServices.add(new ExecuteStageRequestService(create, this.tasksStatusSubject, new WorkerExecutionOperationsNetworkStage(this.vmTaskStatusSubject, this.masterMonitor, this.config, new WorkerMetricsClient(this.masterMonitor), this.sinkSubscriptionStateHandlerFactory, this.userCodeClassLoader.asClassLoader()), getJobProviderClass(), this.userCodeClassLoader, this.mantisJob));
        log.info("Starting Mantis Worker for task {}", this);
        for (Service service : this.mantisServices) {
            log.info("Starting service: " + service.getClass().getName());
            try {
                service.start();
            } catch (Throwable th) {
                log.error(String.format("Failed to start service %s: %s", service, th.getMessage()), th);
                throw th;
            }
        }
        create.onNext(this.wrappedExecuteStageRequest);
    }

    protected void shutDown() {
        log.info("Attempting to cancel task {}", this);
        for (Service service : this.mantisServices) {
            log.info("Stopping service: " + service.getClass().getName());
            try {
                service.shutdown();
            } catch (Throwable th) {
                log.error(String.format("Failed to stop service %s: %s", service, th.getMessage()), th);
                throw th;
            }
        }
    }

    private Optional<String> getJobProviderClass() {
        return this.executeStageRequest.getNameOfJobProviderClass();
    }

    protected Observable<Status> getStatus() {
        return this.tasksStatusSubject.flatMap(observable -> {
            return observable;
        });
    }

    public Observable<VirtualMachineTaskStatus> getVMStatus() {
        return this.vmTaskStatusSubject;
    }

    public String getWorkerId() {
        return this.executeStageRequest.getWorkerId().getId();
    }
}
