package io.mantisrx.server.agent;

import io.github.resilience4j.core.IntervalFunction;
import io.github.resilience4j.retry.Retry;
import io.github.resilience4j.retry.RetryConfig;
import io.github.resilience4j.retry.RetryRegistry;
import io.github.resilience4j.retry.event.RetryOnRetryEvent;
import io.mantisrx.common.Ack;
import io.mantisrx.common.metrics.Counter;
import io.mantisrx.common.metrics.Metrics;
import io.mantisrx.common.metrics.spectator.MetricGroupId;
import io.mantisrx.server.agent.utils.DurableBooleanState;
import io.mantisrx.server.agent.utils.ExponentialBackoffAbstractScheduledService;
import io.mantisrx.server.master.resourcecluster.ResourceClusterGateway;
import io.mantisrx.server.master.resourcecluster.TaskExecutorDisconnection;
import io.mantisrx.server.master.resourcecluster.TaskExecutorHeartbeat;
import io.mantisrx.server.master.resourcecluster.TaskExecutorRegistration;
import io.mantisrx.shaded.com.google.common.util.concurrent.AbstractScheduledService;
import java.lang.invoke.SerializedLambda;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.flink.api.common.time.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/mantisrx/server/agent/ResourceManagerGatewayCxn.class */
class ResourceManagerGatewayCxn extends ExponentialBackoffAbstractScheduledService {
    private static final Logger log = LoggerFactory.getLogger(ResourceManagerGatewayCxn.class);
    private final int idx;
    private final TaskExecutorRegistration taskExecutorRegistration;
    private volatile ResourceClusterGateway gateway;
    private final Time heartBeatInterval;
    private final Time heartBeatTimeout;
    private final Time timeout;
    private final long registrationRetryInitialDelayMillis;
    private final double registrationRetryMultiplier;
    private final double registrationRetryRandomizationFactor;
    private final int registrationRetryMaxAttempts;
    private final int tolerableConsecutiveHeartbeatFailures;
    private final TaskExecutor taskExecutor;
    private volatile boolean registered;
    private final Counter heartbeatTimeoutCounter;
    private final Counter heartbeatFailureCounter;
    private final Counter taskExecutorRegistrationFailureCounter;
    private final Counter taskExecutorDisconnectionFailureCounter;
    private final Counter taskExecutorRegistrationCounter;
    private final Counter taskExecutorDisconnectionCounter;
    private final DurableBooleanState alreadyRegistered;

    /* JADX INFO: Access modifiers changed from: package-private */
    public ResourceManagerGatewayCxn(int i, TaskExecutorRegistration taskExecutorRegistration, ResourceClusterGateway resourceClusterGateway, Time time, Time time2, TaskExecutor taskExecutor, int i2, long j, long j2, long j3, double d, double d2, int i3, DurableBooleanState durableBooleanState) {
        super(j, j2);
        this.timeout = Time.of(1000L, TimeUnit.MILLISECONDS);
        this.registered = false;
        this.tolerableConsecutiveHeartbeatFailures = i2;
        this.idx = i;
        this.taskExecutorRegistration = taskExecutorRegistration;
        this.gateway = resourceClusterGateway;
        this.heartBeatInterval = time;
        this.heartBeatTimeout = time2;
        this.taskExecutor = taskExecutor;
        this.registrationRetryInitialDelayMillis = j3;
        this.registrationRetryMultiplier = d;
        this.registrationRetryRandomizationFactor = d2;
        this.registrationRetryMaxAttempts = i3;
        this.alreadyRegistered = durableBooleanState;
        Metrics build = new Metrics.Builder().id(new MetricGroupId("TaskExecutor")).addCounter("heartbeatTimeout").addCounter("heartbeatFailure").addCounter("taskExecutorRegistrationFailure").addCounter("taskExecutorDisconnectionFailure").addCounter("taskExecutorRegistration").addCounter("taskExecutorDisconnection").build();
        this.heartbeatTimeoutCounter = build.getCounter("heartbeatTimeout");
        this.heartbeatFailureCounter = build.getCounter("heartbeatFailure");
        this.taskExecutorRegistrationFailureCounter = build.getCounter("taskExecutorRegistrationFailure");
        this.taskExecutorDisconnectionFailureCounter = build.getCounter("taskExecutorDisconnectionFailure");
        this.taskExecutorRegistrationCounter = build.getCounter("taskExecutorRegistration");
        this.taskExecutorDisconnectionCounter = build.getCounter("taskExecutorDisconnection");
    }

    protected String serviceName() {
        return "ResourceManagerGatewayCxn-" + this.idx;
    }

    protected AbstractScheduledService.Scheduler scheduler() {
        return AbstractScheduledService.Scheduler.newFixedDelaySchedule(0L, this.heartBeatInterval.getSize(), this.heartBeatInterval.getUnit());
    }

    public void startUp() throws Exception {
        try {
            if (this.alreadyRegistered.getState()) {
                log.info("Registered with resource manager {} already", this.gateway);
                runIteration();
            } else {
                log.info("Trying to register with resource manager {}", this.gateway);
                registerTaskExecutorWithRetry();
            }
            this.registered = true;
            this.alreadyRegistered.setState(true);
        } catch (Exception e) {
            log.error("Registration to gateway {} has failed; Disconnecting now to be safe", this.gateway, e);
            try {
                disconnectTaskExecutor();
            } catch (Exception e2) {
            }
            throw e;
        }
    }

    private void onRegistrationRetry(RetryOnRetryEvent retryOnRetryEvent) {
        log.info("Retrying task executor registration. {}", retryOnRetryEvent);
        this.taskExecutorRegistrationFailureCounter.increment();
    }

    private void registerTaskExecutorWithRetry() throws Exception {
        RetryConfig build = RetryConfig.custom().maxAttempts(this.registrationRetryMaxAttempts).intervalFunction(IntervalFunction.ofExponentialRandomBackoff(this.registrationRetryInitialDelayMillis, this.registrationRetryMultiplier, this.registrationRetryRandomizationFactor)).build();
        Retry retry = RetryRegistry.of(build).retry("ResourceManagerGatewayCxn:registerTaskExecutor", build);
        retry.getEventPublisher().onRetry(this::onRegistrationRetry);
        try {
            Retry.decorateCheckedSupplier(retry, this::registerTaskExecutor).apply();
        } catch (Throwable th) {
            throw new RuntimeException(th);
        }
    }

    private Ack registerTaskExecutor() throws ExecutionException, InterruptedException, TimeoutException {
        this.taskExecutorRegistrationCounter.increment();
        return (Ack) this.gateway.registerTaskExecutor(this.taskExecutorRegistration).get(this.heartBeatTimeout.getSize(), this.heartBeatTimeout.getUnit());
    }

    private void disconnectTaskExecutor() throws ExecutionException, InterruptedException, TimeoutException {
        this.taskExecutorDisconnectionCounter.increment();
        try {
            this.gateway.disconnectTaskExecutor(new TaskExecutorDisconnection(this.taskExecutorRegistration.getTaskExecutorID(), this.taskExecutorRegistration.getClusterID())).get(2 * this.heartBeatTimeout.getSize(), this.heartBeatTimeout.getUnit());
        } catch (Exception e) {
            log.error("Disconnection has failed", e);
            this.taskExecutorDisconnectionFailureCounter.increment();
            throw e;
        }
    }

    @Override // io.mantisrx.server.agent.utils.ExponentialBackoffAbstractScheduledService
    protected void runIteration() throws Exception {
        try {
            this.taskExecutor.getCurrentReport().thenComposeAsync(taskExecutorReport -> {
                log.debug("Sending heartbeat to resource manager {} with report {}", this.gateway, taskExecutorReport);
                return this.gateway.heartBeatFromTaskExecutor(new TaskExecutorHeartbeat(this.taskExecutorRegistration.getTaskExecutorID(), this.taskExecutorRegistration.getClusterID(), taskExecutorReport));
            }).get(this.heartBeatTimeout.getSize(), this.heartBeatTimeout.getUnit());
            this.registered = true;
        } catch (TimeoutException e) {
            this.heartbeatTimeoutCounter.increment();
            handleHeartbeatFailure(e);
            throw e;
        } catch (Exception e2) {
            this.heartbeatFailureCounter.increment();
            handleHeartbeatFailure(e2);
            throw e2;
        }
    }

    private void handleHeartbeatFailure(Exception exc) throws Exception {
        log.error("Failed to send heartbeat to gateway {}", this.gateway, exc);
        if (getRetryCount() >= this.tolerableConsecutiveHeartbeatFailures) {
            this.registered = false;
        } else {
            log.info("Ignoring heartbeat failure to gateway {} due to failed heartbeats {} <= {}", new Object[]{this.gateway, Integer.valueOf(getRetryCount()), Integer.valueOf(this.tolerableConsecutiveHeartbeatFailures)});
        }
    }

    public void shutDown() throws Exception {
        this.registered = false;
        disconnectTaskExecutor();
    }

    public String toString() {
        return "ResourceManagerGatewayCxn(gateway=" + getGateway() + ")";
    }

    public ResourceClusterGateway getGateway() {
        return this.gateway;
    }

    public void setGateway(ResourceClusterGateway resourceClusterGateway) {
        this.gateway = resourceClusterGateway;
    }

    public boolean isRegistered() {
        return this.registered;
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -107027525:
                if (implMethodName.equals("registerTaskExecutor")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 7 && serializedLambda.getFunctionalInterfaceClass().equals("io/vavr/CheckedFunction0") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()Ljava/lang/Object;") && serializedLambda.getImplClass().equals("io/mantisrx/server/agent/ResourceManagerGatewayCxn") && serializedLambda.getImplMethodSignature().equals("()Lio/mantisrx/common/Ack;")) {
                    ResourceManagerGatewayCxn resourceManagerGatewayCxn = (ResourceManagerGatewayCxn) serializedLambda.getCapturedArg(0);
                    return resourceManagerGatewayCxn::registerTaskExecutor;
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
