package org.flinkextended.flink.ml.cluster.node.runner;

import java.io.IOException;
import org.flinkextended.flink.ml.cluster.node.MLContext;
import org.flinkextended.flink.ml.cluster.rpc.AMClient;
import org.flinkextended.flink.ml.cluster.rpc.AMRegistry;
import org.flinkextended.flink.ml.cluster.rpc.NodeServer;
import org.flinkextended.flink.ml.cluster.rpc.RpcCode;
import org.flinkextended.flink.ml.proto.NodeSpec;
import org.flinkextended.flink.ml.proto.SimpleResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/flinkextended/flink/ml/cluster/node/runner/NodeHeartBeatRunner.class */
public class NodeHeartBeatRunner implements Runnable {
    private static final Logger LOG = LoggerFactory.getLogger(NodeHeartBeatRunner.class);
    private static final long interval = 5000;
    private volatile boolean stop = false;
    private Exception lastException = null;
    private AMClient amClient = null;
    private final MLContext mlContext;
    private final NodeServer server;
    private final NodeSpec nodeSpec;
    private final long version;

    public NodeHeartBeatRunner(MLContext mLContext, NodeServer nodeServer, NodeSpec nodeSpec, long j) {
        this.mlContext = mLContext;
        this.server = nodeServer;
        this.nodeSpec = nodeSpec;
        this.version = j;
    }

    public void setStopFlag(boolean z) {
        this.stop = z;
    }

    @Override // java.lang.Runnable
    public void run() {
        SimpleResponse heartbeat;
        while (!this.stop) {
            try {
                try {
                    Thread.sleep(interval);
                    if (null == this.amClient) {
                        this.amClient = AMRegistry.getAMClient(this.mlContext, 1000L);
                    }
                    heartbeat = this.amClient.heartbeat(this.version, this.nodeSpec);
                } catch (InterruptedException e) {
                    LOG.info("{} heartbeat thread interrupted", this.mlContext.getIdentity());
                    if (null != this.amClient) {
                        this.amClient.close();
                        this.amClient = null;
                        return;
                    }
                    return;
                } catch (Exception e2) {
                    if (this.lastException == null || !this.lastException.getMessage().equals(e2.getMessage())) {
                        LOG.warn("{} failed to send heartbeat to AM {}", this.mlContext.getIdentity(), e2.getMessage());
                        this.lastException = e2;
                    } else {
                        LOG.warn("{} heartbeat failed with same reason: {}, stacktrace suppressed", this.mlContext.getIdentity(), this.lastException.getMessage());
                    }
                    try {
                        if (this.amClient != null) {
                            LOG.info("{} closing old AM connection", this.mlContext.getIdentity());
                            this.amClient.close();
                            this.amClient = null;
                        }
                        this.amClient = AMRegistry.getAMClient(this.mlContext, 1000L);
                        LOG.info("{} reconnect AM connection", this.mlContext.getIdentity());
                    } catch (FlinkKillException e3) {
                        LOG.warn("{} failed to update am address", this.mlContext.getIdentity(), e3);
                        if (null != this.amClient) {
                            this.amClient.close();
                            this.amClient = null;
                            return;
                        }
                        return;
                    } catch (IOException e4) {
                        LOG.warn("{} failed to update am address error {}", this.mlContext.getIdentity(), e4.getMessage());
                        this.lastException = e4;
                    }
                }
                if (heartbeat.getCode() == RpcCode.VERSION_ERROR.ordinal()) {
                    LOG.warn("{} heartbeat wrong version {}, terminating heartbeat thread and restart tf node", this.mlContext.getIdentity(), Long.valueOf(this.version));
                    this.server.setAmCommand(NodeServer.AMCommand.RESTART);
                    break;
                } else {
                    if (heartbeat.getCode() != RpcCode.OK.ordinal()) {
                        LOG.warn("{} heartbeat response code {}", this.mlContext.getIdentity(), Integer.valueOf(heartbeat.getCode()));
                    }
                    this.lastException = null;
                }
            } catch (Throwable th) {
                if (null != this.amClient) {
                    this.amClient.close();
                    this.amClient = null;
                }
                throw th;
            }
        }
        if (null != this.amClient) {
            this.amClient.close();
            this.amClient = null;
        }
    }
}
