package org.flinkextended.flink.ml.cluster.rpc;

import java.io.IOException;
import java.time.Duration;
import java.util.Map;
import java.util.concurrent.TimeoutException;
import org.flinkextended.flink.ml.cluster.master.meta.AMMeta;
import org.flinkextended.flink.ml.cluster.node.MLContext;
import org.flinkextended.flink.ml.cluster.node.runner.FlinkKillException;
import org.flinkextended.flink.ml.cluster.storage.Storage;
import org.flinkextended.flink.ml.cluster.storage.StorageFactory;
import org.flinkextended.flink.ml.util.MLConstants;
import org.flinkextended.flink.ml.util.MLException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/flinkextended/flink/ml/cluster/rpc/AMRegistry.class */
public class AMRegistry {
    private static final Logger LOG = LoggerFactory.getLogger(AMRegistry.class);
    private static final Duration WAIT_INTERVAL = Duration.ofSeconds(5);

    public static AMClient getAMClient(MLContext mLContext) throws IOException {
        return getAMClient(mLContext, Long.valueOf(mLContext.getProperties().getOrDefault(MLConstants.AM_REGISTRY_TIMEOUT, MLConstants.AM_REGISTRY_TIMEOUT_DEFAULT)).longValue());
    }

    public static AMClient getAMClient(MLContext mLContext, long j) throws IOException {
        return getAMClient(mLContext.getProperties(), j);
    }

    public static AMClient getAMClient(Map<String, String> map, long j) throws IOException {
        Storage storage = null;
        AMClient aMClient = null;
        try {
            try {
                Storage storageInstance = StorageFactory.getStorageInstance(map);
                long currentTimeMillis = System.currentTimeMillis();
                while (true) {
                    long currentTimeMillis2 = System.currentTimeMillis() - currentTimeMillis;
                    if (currentTimeMillis2 > j) {
                        if (aMClient != null) {
                            aMClient.close();
                        }
                        throw new MLException("Fail to get AM connection.", new TimeoutException(String.format("AM not ready after %d seconds time out %d", Long.valueOf(currentTimeMillis2 / 1000), Long.valueOf(j / 1000))));
                    }
                    byte[] value = storageInstance.getValue(AMMeta.AM_ADDRESS);
                    if (value == null) {
                        Thread.sleep(WAIT_INTERVAL.toMillis());
                    } else {
                        String str = new String(value);
                        LOG.info("AM address is: " + str);
                        String[] split = new String(value).split(":");
                        if (2 != split.length) {
                            LOG.error("AM ip port not validate:" + str);
                            if (aMClient != null) {
                                aMClient.close();
                            }
                            throw new MLException("AM ip port not validate:" + str);
                        }
                        String str2 = split[0];
                        int intValue = Integer.valueOf(split[1]).intValue();
                        if (aMClient == null || !aMClient.getHost().equals(str2) || aMClient.getPort() != intValue) {
                            if (aMClient != null) {
                                aMClient.close();
                            }
                            aMClient = new AMClient(str2, intValue);
                        }
                        if (aMClient.waitForReady(WAIT_INTERVAL)) {
                            AMClient aMClient2 = aMClient;
                            if (storageInstance != null) {
                                storageInstance.close();
                            }
                            return aMClient2;
                        }
                    }
                }
            } catch (InterruptedException e) {
                if (0 != 0) {
                    aMClient.close();
                }
                throw new FlinkKillException("Interrupted getting AM client", e);
            }
        } catch (Throwable th) {
            if (0 != 0) {
                storage.close();
            }
            throw th;
        }
    }
}
