package ml.shifu.shifu.core.yarn.container;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.ServerSocket;
import java.net.Socket;
import ml.shifu.guagua.coordinator.zk.GuaguaZooKeeper;
import ml.shifu.shifu.core.TrainingIntermediateResult;
import ml.shifu.shifu.core.yarn.util.Constants;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooDefs;

/* loaded from: input_file:ml/shifu/shifu/core/yarn/container/SocketServer.class */
public class SocketServer extends Thread {
    private static final Log LOG = LogFactory.getLog(SocketServer.class);
    private ServerSocket server = new ServerSocket(0);
    private GuaguaZooKeeper zookeeper;
    private String containerId;

    public SocketServer(GuaguaZooKeeper guaguaZooKeeper, String str) throws IOException {
        this.zookeeper = guaguaZooKeeper;
        this.containerId = str;
    }

    @Override // java.lang.Thread, java.lang.Runnable
    public void run() {
        Socket socket = null;
        while (true) {
            try {
                try {
                    socket = this.server.accept();
                    LOG.info("got connection on port " + getServerPort());
                    BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
                    for (String readLine = bufferedReader.readLine(); StringUtils.isNotBlank(readLine); readLine = bufferedReader.readLine()) {
                        LOG.info("received: " + readLine);
                        TrainingIntermediateResult trainingIntermediateResult = new TrainingIntermediateResult();
                        for (String str : readLine.split(",")) {
                            String[] split = str.split(":", 2);
                            if ("worker_index".equals(split[0])) {
                                trainingIntermediateResult.setWorkerIndex(Integer.valueOf(split[1]).intValue());
                            } else if ("time".equals(split[0])) {
                                trainingIntermediateResult.setCurrentEpochTime(Double.valueOf(split[1]).doubleValue());
                            } else if ("current_epoch".equals(split[0])) {
                                trainingIntermediateResult.setCurrentEpochStep(Integer.valueOf(split[1]).intValue());
                            } else if ("training_loss".equals(split[0])) {
                                trainingIntermediateResult.setTrainingError(Double.valueOf(split[1]).doubleValue());
                            } else if ("valid_loss".equals(split[0])) {
                                trainingIntermediateResult.setValidError(Double.valueOf(split[1]).doubleValue());
                            } else if ("valid_time".equals(split[0])) {
                                trainingIntermediateResult.setCurrentEpochValidTime(Double.valueOf(split[1]).doubleValue());
                            } else {
                                LOG.warn("There is unexpacted field in message: " + str);
                            }
                        }
                        LOG.info("After Parsing: " + trainingIntermediateResult.toString());
                        this.zookeeper.createOrSetExt(Constants.WORKER_INTERMEDIATE_RESULT_ROOT_PATH + this.containerId, trainingIntermediateResult.serialize(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, true, -1);
                    }
                    if (socket != null) {
                        try {
                            socket.close();
                        } catch (IOException e) {
                            LOG.error("socket close fails", e);
                        }
                    }
                } catch (Throwable th) {
                    if (socket != null) {
                        try {
                            socket.close();
                        } catch (IOException e2) {
                            LOG.error("socket close fails", e2);
                        }
                    }
                    throw th;
                }
            } catch (InterruptedException e3) {
                LOG.error("Writing zookeeper has some problem", e3);
                if (socket != null) {
                    try {
                        socket.close();
                    } catch (IOException e4) {
                        LOG.error("socket close fails", e4);
                    }
                }
            } catch (KeeperException e5) {
                LOG.error("Writing zookeeper has some problem", e5);
                if (socket != null) {
                    try {
                        socket.close();
                    } catch (IOException e6) {
                        LOG.error("socket close fails", e6);
                    }
                }
            } catch (IOException e7) {
                LOG.error("Scoket Server has some problem", e7);
                if (socket != null) {
                    try {
                        socket.close();
                    } catch (IOException e8) {
                        LOG.error("socket close fails", e8);
                    }
                }
            }
        }
    }

    public int getServerPort() {
        return this.server.getLocalPort();
    }
}
