package io.mantisrx.server.master.client;

import com.spotify.futures.CompletableFutures;
import io.mantisrx.common.Ack;
import io.mantisrx.common.Label;
import io.mantisrx.common.network.Endpoint;
import io.mantisrx.runtime.JobSla;
import io.mantisrx.runtime.MantisJobDefinition;
import io.mantisrx.runtime.MantisJobState;
import io.mantisrx.runtime.NamedJobDefinition;
import io.mantisrx.runtime.WorkerMigrationConfig;
import io.mantisrx.runtime.codec.JsonCodec;
import io.mantisrx.runtime.descriptor.DeploymentStrategy;
import io.mantisrx.runtime.descriptor.SchedulingInfo;
import io.mantisrx.runtime.parameter.Parameter;
import io.mantisrx.server.core.JobAssignmentResult;
import io.mantisrx.server.core.JobSchedulingInfo;
import io.mantisrx.server.core.NamedJobInfo;
import io.mantisrx.server.core.PostJobStatusRequest;
import io.mantisrx.server.core.Status;
import io.mantisrx.server.core.master.MasterDescription;
import io.mantisrx.server.core.master.MasterMonitor;
import io.mantisrx.shaded.com.fasterxml.jackson.core.JsonProcessingException;
import io.mantisrx.shaded.com.fasterxml.jackson.core.type.TypeReference;
import io.mantisrx.shaded.com.fasterxml.jackson.databind.DeserializationFeature;
import io.mantisrx.shaded.com.fasterxml.jackson.databind.ObjectMapper;
import io.mantisrx.shaded.com.fasterxml.jackson.datatype.jdk8.Jdk8Module;
import io.netty.buffer.ByteBuf;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpStatusClass;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.reactivex.mantis.remote.observable.ConnectToObservable;
import io.reactivex.mantis.remote.observable.DynamicConnectionSet;
import io.reactivex.mantis.remote.observable.ToDeltaEndpointInjector;
import io.reactivex.mantis.remote.observable.reconciliator.Reconciliator;
import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URL;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import mantis.io.reactivex.netty.RxNetty;
import mantis.io.reactivex.netty.pipeline.PipelineConfigurators;
import mantis.io.reactivex.netty.protocol.http.client.HttpClient;
import mantis.io.reactivex.netty.protocol.http.client.HttpClientRequest;
import mantis.io.reactivex.netty.protocol.http.sse.ServerSentEvent;
import mantis.io.reactivex.netty.protocol.http.websocket.WebSocketClient;
import org.asynchttpclient.AsyncHttpClient;
import org.asynchttpclient.Dsl;
import org.json.JSONArray;
import org.json.JSONException;
import org.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.functions.Func1;

/* loaded from: input_file:io/mantisrx/server/master/client/MantisMasterClientApi.class */
public class MantisMasterClientApi implements MantisMasterGateway {
    static final String ConnectTimeoutSecsPropertyName = "MantisClientConnectTimeoutSecs";
    private static final String JOB_METADATA_FIELD = "jobMetadata";
    private static final String STAGE_MEDATA_LIST_FIELD = "stageMetadataList";
    private static final String STAGE_NUM_FIELD = "stageNum";
    private static final String NUM_STAGES_FIELD = "numStages";
    private static final int MAX_REDIRECTS = 10;
    private static final String API_JOBS_LIST_PATH = "/api/jobs/list";
    private static final String API_JOBS_LIST_MATCHING_PATH = "/api/jobs/list/matching";
    private static final String API_JOB_SUBMIT_PATH = "/api/submit";
    private static final String API_JOB_NAME_CREATE = "/api/namedjob/create";
    private static final String API_JOB_NAME_UPDATE = "/api/namedjob/update";
    private static final String API_JOB_NAME_LIST = "/api/namedjob/list";
    private static final String API_JOB_KILL = "/api/jobs/kill";
    private static final String API_JOB_STAGE_SCALE = "/api/jobs/scaleStage";
    private static final String API_JOB_RESUBMIT_WORKER = "/api/jobs/resubmitWorker";
    private static final int SUBSCRIBE_ATTEMPTS_TO_MASTER = 100;
    private static final int MAX_RANDOM_WAIT_RETRY_SEC = 10;
    private static final long MASTER_SCHED_INFO_HEARTBEAT_INTERVAL_SECS = 120;
    private final Observable<Endpoint> masterEndpoint;
    private final int subscribeAttemptsToMaster;
    private MasterMonitor masterMonitor;
    private static final Logger logger = LoggerFactory.getLogger(MantisMasterClientApi.class);
    private static final ObjectMapper objectMapper = new ObjectMapper();
    final String DEFAULT_RESPONSE = "NO_RESPONSE_FROM_MASTER";
    private final long GET_TIMEOUT_SECS = 30;
    private final Func1<Observable<? extends Throwable>, Observable<?>> retryLogic = observable -> {
        return observable.zipWith(Observable.range(1, Integer.MAX_VALUE), (th, num) -> {
            return num;
        }).flatMap(num2 -> {
            long intValue = 2 * (num2.intValue() > 10 ? 10 : num2.intValue());
            logger.info(": retrying conx after sleeping for " + intValue + " secs");
            return Observable.timer(intValue, TimeUnit.SECONDS);
        });
    };
    private final Func1<Observable<? extends Void>, Observable<?>> repeatLogic = observable -> {
        return observable.zipWith(Observable.range(1, Integer.MAX_VALUE), (r2, num) -> {
            return num;
        }).flatMap(num2 -> {
            long intValue = 2 * (num2.intValue() > 10 ? 10 : num2.intValue());
            logger.warn("On Complete received! : repeating conx after sleeping for " + intValue + " secs");
            return Observable.timer(intValue, TimeUnit.SECONDS);
        });
    };
    private final AsyncHttpClient apiClient = Dsl.asyncHttpClient();

    public MantisMasterClientApi(MasterMonitor masterMonitor) {
        this.masterMonitor = masterMonitor;
        this.masterEndpoint = masterMonitor.getMasterObservable().filter(masterDescription -> {
            return Boolean.valueOf(masterDescription != null);
        }).map(masterDescription2 -> {
            logger.info("New Mantis Master notification, host: " + masterDescription2.getHostname() + ", swapping out client API connection to new master.");
            return new Endpoint(masterDescription2.getHostname(), masterDescription2.getApiPortV2());
        });
        String property = System.getProperty(ConnectTimeoutSecsPropertyName);
        if (property != null) {
            try {
                Math.max(1, (int) Math.sqrt(2.0d * Long.parseLong(property)));
            } catch (NumberFormatException e) {
                logger.warn("Invalid number for connectTimeoutSecs: " + property);
            }
        }
        this.subscribeAttemptsToMaster = Integer.MAX_VALUE;
    }

    private String toUri(MasterDescription masterDescription, String str) {
        return "http://" + masterDescription.getHostname() + ":" + masterDescription.getApiPort() + str;
    }

    @Override // io.mantisrx.server.master.client.MantisMasterGateway
    public Observable<JobSubmitResponse> submitJob(String str, String str2, List<Parameter> list, JobSla jobSla, SchedulingInfo schedulingInfo) {
        return submitJob(str, str2, list, jobSla, 0L, schedulingInfo, WorkerMigrationConfig.DEFAULT);
    }

    public Observable<JobSubmitResponse> submitJob(String str, String str2, List<Parameter> list, JobSla jobSla, long j, SchedulingInfo schedulingInfo, WorkerMigrationConfig workerMigrationConfig) {
        return submitJob(str, str2, list, jobSla, j, schedulingInfo, false, workerMigrationConfig);
    }

    @Override // io.mantisrx.server.master.client.MantisMasterGateway
    public Observable<JobSubmitResponse> submitJob(String str, String str2, List<Parameter> list, JobSla jobSla, long j, SchedulingInfo schedulingInfo) {
        return submitJob(str, str2, list, jobSla, j, schedulingInfo, false, WorkerMigrationConfig.DEFAULT);
    }

    @Override // io.mantisrx.server.master.client.MantisMasterGateway
    public Observable<JobSubmitResponse> submitJob(String str, String str2, List<Parameter> list, JobSla jobSla, long j, SchedulingInfo schedulingInfo, boolean z) {
        return submitJob(str, str2, list, jobSla, j, schedulingInfo, z, WorkerMigrationConfig.DEFAULT);
    }

    @Override // io.mantisrx.server.master.client.MantisMasterGateway
    public Observable<JobSubmitResponse> submitJob(String str, String str2, List<Parameter> list, JobSla jobSla, long j, SchedulingInfo schedulingInfo, boolean z, WorkerMigrationConfig workerMigrationConfig) {
        return submitJob(str, str2, list, jobSla, j, schedulingInfo, z, workerMigrationConfig, new LinkedList());
    }

    @Override // io.mantisrx.server.master.client.MantisMasterGateway
    public Observable<JobSubmitResponse> submitJob(String str, String str2, List<Parameter> list, JobSla jobSla, long j, SchedulingInfo schedulingInfo, boolean z, WorkerMigrationConfig workerMigrationConfig, List<Label> list2) {
        try {
            return submitJob(getJobDefinitionString(str, null, str2, list, jobSla, j, schedulingInfo, z, workerMigrationConfig, list2, null));
        } catch (MalformedURLException | JsonProcessingException e) {
            return Observable.error(e);
        }
    }

    public Observable<JobSubmitResponse> submitJob(String str, String str2, List<Parameter> list, JobSla jobSla, long j, SchedulingInfo schedulingInfo, boolean z, WorkerMigrationConfig workerMigrationConfig, List<Label> list2, DeploymentStrategy deploymentStrategy) {
        try {
            return submitJob(getJobDefinitionString(str, null, str2, list, jobSla, j, schedulingInfo, z, workerMigrationConfig, list2, deploymentStrategy));
        } catch (MalformedURLException | JsonProcessingException e) {
            return Observable.error(e);
        }
    }

    public Observable<JobSubmitResponse> submitJob(String str) {
        return this.masterMonitor.getMasterObservable().filter(masterDescription -> {
            return Boolean.valueOf(masterDescription != null);
        }).switchMap(masterDescription2 -> {
            String str2 = "http://" + masterDescription2.getHostname() + ":" + masterDescription2.getApiPort() + API_JOB_SUBMIT_PATH;
            logger.info("Doing POST on " + str2);
            try {
                return getPostResponse(str2, str).onErrorResumeNext(th -> {
                    logger.warn("Can't connect to master: {}", th.getMessage(), th);
                    return Observable.empty();
                }).map(str3 -> {
                    return new JobSubmitResponse(str3, false, null);
                });
            } catch (Exception e) {
                return Observable.error(e);
            }
        });
    }

    private String getJobDefinitionString(String str, String str2, String str3, List<Parameter> list, JobSla jobSla, long j, SchedulingInfo schedulingInfo, boolean z, WorkerMigrationConfig workerMigrationConfig, List<Label> list2, DeploymentStrategy deploymentStrategy) throws JsonProcessingException, MalformedURLException {
        return objectMapper.writeValueAsString(new MantisJobDefinition(str, System.getProperty("user.name"), str2 == null ? null : new URL(str2), str3, list, jobSla, j, schedulingInfo, 0, 0, (String) null, (NamedJobDefinition.CronPolicy) null, z, workerMigrationConfig, list2, deploymentStrategy));
    }

    @Override // io.mantisrx.server.master.client.MantisMasterGateway
    public Observable<Void> killJob(String str) {
        return killJob(str, "Unknown", "User requested");
    }

    @Override // io.mantisrx.server.master.client.MantisMasterGateway
    public Observable<Void> killJob(String str, String str2, String str3) {
        return this.masterMonitor.getMasterObservable().filter(masterDescription -> {
            return Boolean.valueOf(masterDescription != null);
        }).switchMap(masterDescription2 -> {
            HashMap hashMap = new HashMap();
            hashMap.put("JobId", str);
            hashMap.put("user", str2);
            hashMap.put("reason", str3);
            try {
                return getPostResponse(toUri(masterDescription2, API_JOB_KILL), objectMapper.writeValueAsString(hashMap)).onErrorResumeNext(th -> {
                    logger.warn("Can't connect to master: {}", th.getMessage(), th);
                    return Observable.empty();
                }).map(str4 -> {
                    logger.info(str4);
                    return null;
                });
            } catch (JsonProcessingException e) {
                return Observable.error(e);
            }
        });
    }

    @Override // io.mantisrx.server.master.client.MantisMasterGateway
    public Observable<Boolean> scaleJobStage(String str, int i, int i2, String str2) {
        return this.masterMonitor.getMasterObservable().filter(masterDescription -> {
            return Boolean.valueOf(masterDescription != null);
        }).take(1).flatMap(masterDescription2 -> {
            StageScaleRequest stageScaleRequest = new StageScaleRequest(str, i, i2, str2);
            try {
                return submitPostRequest(toUri(masterDescription2, API_JOB_STAGE_SCALE), objectMapper.writeValueAsString(stageScaleRequest)).map(httpResponseStatus -> {
                    logger.info("POST to scale stage returned status: {}", httpResponseStatus);
                    return Boolean.valueOf(httpResponseStatus.codeClass().equals(HttpStatusClass.SUCCESS));
                });
            } catch (JsonProcessingException e) {
                logger.error("failed to serialize stage scale request {} to json", stageScaleRequest);
                return Observable.error(e);
            }
        });
    }

    @Override // io.mantisrx.server.master.client.MantisMasterGateway
    public Observable<Boolean> resubmitJobWorker(String str, String str2, int i, String str3) {
        return this.masterMonitor.getMasterObservable().filter(masterDescription -> {
            return Boolean.valueOf(masterDescription != null);
        }).take(1).flatMap(masterDescription2 -> {
            ResubmitJobWorkerRequest resubmitJobWorkerRequest = new ResubmitJobWorkerRequest(str, str2, i, str3);
            logger.info("sending request to resubmit worker {} for jobId {}", Integer.valueOf(i), str);
            try {
                return submitPostRequest(toUri(masterDescription2, API_JOB_RESUBMIT_WORKER), objectMapper.writeValueAsString(resubmitJobWorkerRequest)).map(httpResponseStatus -> {
                    logger.info("POST to resubmit worker {} returned status: {}", Integer.valueOf(i), httpResponseStatus);
                    return Boolean.valueOf(httpResponseStatus.codeClass().equals(HttpStatusClass.SUCCESS));
                });
            } catch (JsonProcessingException e) {
                logger.error("failed to serialize resubmit job worker request {} to json", resubmitJobWorkerRequest);
                return Observable.error(e);
            }
        });
    }

    private Observable<HttpResponseStatus> submitPostRequest(String str, String str2) {
        logger.info("sending POST request to {} content {}", str, str2);
        return RxNetty.createHttpRequest(HttpClientRequest.createPost(str).withContent(str2), new HttpClient.HttpClientConfig.Builder().build()).map(httpClientResponse -> {
            return httpClientResponse.getStatus();
        });
    }

    private Observable<String> getPostResponse(String str, String str2) {
        logger.info("sending POST request to {} content {}", str, str2);
        return RxNetty.createHttpRequest(HttpClientRequest.createPost(str).withContent(str2), new HttpClient.HttpClientConfig.Builder().build()).flatMap(httpClientResponse -> {
            return httpClientResponse.getContent();
        }).map(byteBuf -> {
            return byteBuf.toString(Charset.defaultCharset());
        });
    }

    @Override // io.mantisrx.server.master.client.MantisMasterGateway
    public Observable<Boolean> namedJobExists(String str) {
        return this.masterMonitor.getMasterObservable().filter(masterDescription -> {
            return Boolean.valueOf(masterDescription != null);
        }).switchMap(masterDescription2 -> {
            String str2 = "/api/namedjob/list/" + str;
            logger.info("Calling GET on " + str2);
            return HttpUtility.getGetResponse(masterDescription2.getHostname(), masterDescription2.getApiPort(), str2).onErrorResumeNext(th -> {
                logger.warn("Can't connect to master: {}", th.getMessage(), th);
                return Observable.error(th);
            }).map(str3 -> {
                logger.debug("Job cluster response: " + str3);
                return Boolean.valueOf(new JSONArray(str3).length() > 0);
            }).retryWhen(this.retryLogic);
        }).retryWhen(this.retryLogic);
    }

    @Override // io.mantisrx.server.master.client.MantisMasterGateway
    public Observable<Integer> getSinkStageNum(String str) {
        return this.masterMonitor.getMasterObservable().filter(masterDescription -> {
            return Boolean.valueOf(masterDescription != null);
        }).switchMap(masterDescription2 -> {
            String str2 = "/api/jobs/list/" + str;
            logger.info("Calling GET on " + str2);
            return HttpUtility.getGetResponse(masterDescription2.getHostname(), masterDescription2.getApiPort(), str2).onErrorResumeNext(th -> {
                logger.warn("Can't connect to master: {}", th.getMessage(), th);
                return Observable.error(th);
            }).flatMap(str3 -> {
                try {
                    logger.info("Got response for job info on " + str);
                    Integer sinkStageNumFromJsonResponse = getSinkStageNumFromJsonResponse(str, str3);
                    if (sinkStageNumFromJsonResponse.intValue() >= 0) {
                        return Observable.just(sinkStageNumFromJsonResponse);
                    }
                    logger.warn("Job " + str + " not found");
                    return Observable.error(new Exception("Job " + str + " not found, response: " + str3));
                } catch (MasterClientException e) {
                    logger.warn("Can't get sink stage info for " + str + ": " + e.getMessage());
                    return Observable.error(new Exception("Can't get sink stage info for " + str + ": " + e.getMessage(), e));
                }
            }).retryWhen(this.retryLogic);
        });
    }

    @Override // io.mantisrx.server.master.client.MantisMasterGateway
    public Observable<String> getJobsOfNamedJob(String str, MantisJobState.MetaState metaState) {
        return this.masterMonitor.getMasterObservable().filter(masterDescription -> {
            return Boolean.valueOf(masterDescription != null);
        }).switchMap(masterDescription2 -> {
            String str2 = "/api/jobs/list/matching/" + str;
            if (metaState != null) {
                str2 = str2 + "?jobState=" + metaState;
            }
            logger.info("Calling GET on " + str2);
            return HttpUtility.getGetResponse(masterDescription2.getHostname(), masterDescription2.getApiPort(), str2).onErrorResumeNext(th -> {
                logger.warn("Can't connect to master: {}", th.getMessage(), th);
                return Observable.empty();
            });
        }).retryWhen(this.retryLogic);
    }

    public Observable<Boolean> jobIdExists(String str) {
        return this.masterMonitor.getMasterObservable().filter(masterDescription -> {
            return Boolean.valueOf(masterDescription != null);
        }).switchMap(masterDescription2 -> {
            String str2 = "/api/jobs/list/" + str;
            logger.info("Calling GET on " + str2);
            return HttpUtility.getGetResponse(masterDescription2.getHostname(), masterDescription2.getApiPort(), str2).onErrorResumeNext(th -> {
                logger.warn("Can't connect to master: {}", th.getMessage(), th);
                return Observable.empty();
            });
        }).retryWhen(this.retryLogic).map(str2 -> {
            return Boolean.valueOf(!payloadIsError(str2));
        });
    }

    private boolean payloadIsError(String str) {
        try {
            return ((Map) objectMapper.readValue(str, new TypeReference<Map<String, String>>() { // from class: io.mantisrx.server.master.client.MantisMasterClientApi.1
            })).get("error") != null;
        } catch (Exception e) {
            return false;
        }
    }

    private Integer getSinkStageNumFromJsonResponse(String str, String str2) throws MasterClientException {
        String str3 = "Can't parse json response for job " + str;
        if (str2 == null) {
            logger.warn("Null info response from master for job " + str);
            throw new MasterClientException(str3);
        }
        try {
            JSONObject jSONObject = new JSONObject(str2);
            JSONObject optJSONObject = jSONObject.optJSONObject(JOB_METADATA_FIELD);
            if (optJSONObject == null) {
                logger.warn("Didn't find meta data for job " + str + " in json (" + str2 + ")");
                return -1;
            }
            String optString = optJSONObject.optString("state");
            if (optString == null) {
                throw new MasterClientException("Can't read job state in response (" + str2 + ")");
            }
            if (MantisJobState.isTerminalState(MantisJobState.valueOf(optString))) {
                logger.info("Can't get sink stage of job in state " + MantisJobState.valueOf(optString));
                return -1;
            }
            int i = 0;
            JSONArray optJSONArray = jSONObject.optJSONArray(STAGE_MEDATA_LIST_FIELD);
            if (optJSONArray == null) {
                logger.warn("Didn't find stages metadata for job " + str + " in json: " + str2);
                throw new MasterClientException(str3);
            }
            for (int i2 = 0; i2 < optJSONArray.length(); i2++) {
                i = Math.max(i, optJSONArray.getJSONObject(i2).optInt(STAGE_NUM_FIELD, 0));
            }
            if (i == 0) {
                logger.warn("Didn't find stageNum field in stage metadata json (" + str2 + ")");
                throw new MasterClientException(str3);
            }
            logger.info("Got sink stage number for job " + str + ": " + i);
            return Integer.valueOf(i);
        } catch (JSONException e) {
            logger.error("Error parsing info for job " + str + " from json data (" + str2 + "): " + e.getMessage());
            throw new MasterClientException(str3);
        }
    }

    private HttpClient<ByteBuf, ServerSentEvent> getRxnettySseClient(String str, int i) {
        return RxNetty.newHttpClientBuilder(str, i).pipelineConfigurator(PipelineConfigurators.clientSseConfigurator()).withNoConnectionPooling().build();
    }

    private WebSocketClient<TextWebSocketFrame, TextWebSocketFrame> getRxnettyWebSocketClient(String str, int i, String str2) {
        return RxNetty.newWebSocketClientBuilder(str, i).withWebSocketURI(str2).build();
    }

    @Override // io.mantisrx.server.master.client.MantisMasterGateway
    public Observable<String> getJobStatusObservable(String str) {
        return this.masterMonitor.getMasterObservable().filter(masterDescription -> {
            return Boolean.valueOf(masterDescription != null);
        }).retryWhen(this.retryLogic).switchMap(masterDescription2 -> {
            return getRxnettyWebSocketClient(masterDescription2.getHostname(), masterDescription2.getConsolePort(), "ws://" + masterDescription2.getHostname() + ":" + masterDescription2.getApiPort() + "/job/status/" + str).connect().flatMap(observableConnection -> {
                return observableConnection.getInput().map(textWebSocketFrame -> {
                    return textWebSocketFrame.text();
                });
            });
        }).onErrorResumeNext(Observable.empty());
    }

    @Override // io.mantisrx.server.master.client.MantisMasterGateway
    public Observable<JobSchedulingInfo> schedulingChanges(String str) {
        return this.masterMonitor.getMasterObservable().filter(masterDescription -> {
            return Boolean.valueOf(masterDescription != null);
        }).retryWhen(this.retryLogic).switchMap(masterDescription2 -> {
            return getRxnettySseClient(masterDescription2.getHostname(), masterDescription2.getSchedInfoPort()).submit(HttpClientRequest.createGet("/assignmentresults/" + str + "?sendHB=true")).flatMap(httpClientResponse -> {
                return !HttpResponseStatus.OK.equals(httpClientResponse.getStatus()) ? Observable.error(new Exception(httpClientResponse.getStatus().reasonPhrase())) : httpClientResponse.getContent().map(serverSentEvent -> {
                    try {
                        return (JobSchedulingInfo) objectMapper.readValue(serverSentEvent.contentAsString(), JobSchedulingInfo.class);
                    } catch (IOException e) {
                        throw new RuntimeException("Invalid schedInfo json: " + e.getMessage(), e);
                    }
                }).timeout(360L, TimeUnit.SECONDS).filter(jobSchedulingInfo -> {
                    return Boolean.valueOf((jobSchedulingInfo == null || "HB_JobId".equals(jobSchedulingInfo.getJobId())) ? false : true);
                }).distinctUntilChanged();
            });
        }).repeatWhen(this.repeatLogic).retryWhen(this.retryLogic);
    }

    @Override // io.mantisrx.server.master.client.MantisMasterGateway
    public Observable<NamedJobInfo> namedJobInfo(String str) {
        return this.masterMonitor.getMasterObservable().filter(masterDescription -> {
            return Boolean.valueOf(masterDescription != null);
        }).retryWhen(this.retryLogic).switchMap(masterDescription2 -> {
            return getRxnettySseClient(masterDescription2.getHostname(), masterDescription2.getSchedInfoPort()).submit(HttpClientRequest.createGet("/namedjobs/" + str + "?sendHB=true")).flatMap(httpClientResponse -> {
                return !HttpResponseStatus.OK.equals(httpClientResponse.getStatus()) ? Observable.error(new Exception(httpClientResponse.getStatus().reasonPhrase())) : httpClientResponse.getContent().map(serverSentEvent -> {
                    try {
                        return (NamedJobInfo) objectMapper.readValue(serverSentEvent.contentAsString(), NamedJobInfo.class);
                    } catch (IOException e) {
                        throw new RuntimeException("Invalid namedJobInfo json: " + e.getMessage(), e);
                    }
                }).timeout(360L, TimeUnit.SECONDS).filter(namedJobInfo -> {
                    return Boolean.valueOf((namedJobInfo == null || "HB_JobId".equals(namedJobInfo.getName())) ? false : true);
                });
            });
        }).repeatWhen(this.repeatLogic).retryWhen(this.retryLogic);
    }

    @Override // io.mantisrx.server.master.client.MantisMasterGateway
    public Observable<JobAssignmentResult> assignmentResults(String str) {
        ConnectToObservable.Builder decoder = new ConnectToObservable.Builder().subscribeAttempts(this.subscribeAttemptsToMaster).name("/v1/api/master/assignmentresults").decoder(new JsonCodec(JobAssignmentResult.class));
        if (str != null && !str.isEmpty()) {
            HashMap hashMap = new HashMap();
            hashMap.put("jobId", str);
            decoder = decoder.subscribeParameters(hashMap);
        }
        return Observable.merge(new Reconciliator.Builder().name("master-jobAssignmentResults").connectionSet(DynamicConnectionSet.create(decoder, 10)).injector(new ToDeltaEndpointInjector(this.masterEndpoint.map(endpoint -> {
            ArrayList arrayList = new ArrayList(1);
            arrayList.add(endpoint);
            return arrayList;
        }))).build().observables());
    }

    @Override // io.mantisrx.server.master.client.MantisMasterGateway
    public CompletableFuture<Ack> updateStatus(Status status) {
        try {
            return this.apiClient.executeRequest(Dsl.post(this.masterMonitor.getLatestMaster().getFullApiStatusUri()).setBody(objectMapper.writeValueAsString(new PostJobStatusRequest(status.getJobId(), status)))).toCompletableFuture().thenCompose(response -> {
                return response.getStatusCode() == 200 ? CompletableFuture.completedFuture(Ack.getInstance()) : CompletableFutures.exceptionallyCompletedFuture(new Exception(response.getResponseBody()));
            });
        } catch (Exception e) {
            return CompletableFutures.exceptionallyCompletedFuture(e);
        }
    }

    static {
        objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
        objectMapper.registerModule(new Jdk8Module());
    }
}
