package org.flinkextended.flink.ml.tensorflow.cluster;

import java.io.IOException;
import java.util.Map;
import org.flinkextended.flink.ml.cluster.master.AMEvent;
import org.flinkextended.flink.ml.cluster.master.AMEventType;
import org.flinkextended.flink.ml.cluster.master.AMTransition;
import org.flinkextended.flink.ml.cluster.master.AbstractAMStateMachine;
import org.flinkextended.flink.ml.cluster.role.PsRole;
import org.flinkextended.flink.ml.cluster.role.WorkerRole;
import org.flinkextended.flink.ml.cluster.statemachine.InvalidStateTransitionException;
import org.flinkextended.flink.ml.cluster.statemachine.transition.SingleArcTransition;
import org.flinkextended.flink.ml.proto.FinishNodeRequest;
import org.flinkextended.flink.ml.proto.MLJobDef;
import org.flinkextended.flink.ml.proto.RegisterNodeRequest;
import org.flinkextended.flink.ml.util.ProtoUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/flinkextended/flink/ml/tensorflow/cluster/TFTransitions.class */
public class TFTransitions {
    private static final Logger LOG = LoggerFactory.getLogger(TFTransitions.class);

    /* loaded from: input_file:org/flinkextended/flink/ml/tensorflow/cluster/TFTransitions$FinishNode.class */
    public static class FinishNode extends AMTransition implements SingleArcTransition<AbstractAMStateMachine, AMEvent> {
        public FinishNode(AbstractAMStateMachine abstractAMStateMachine) {
            super(abstractAMStateMachine);
        }

        public void transition(AbstractAMStateMachine abstractAMStateMachine, AMEvent aMEvent) throws InvalidStateTransitionException {
            FinishNodeRequest finishNodeRequest = (FinishNodeRequest) aMEvent.getMessage();
            TFTransitions.LOG.info("Finish Node:" + ProtoUtil.protoToJson(finishNodeRequest.getNodeSpec()));
            try {
                if (this.eventReporter != null) {
                    this.eventReporter.nodeFinish(new String[]{nodeSpec2Str(finishNodeRequest.getNodeSpec())});
                }
                this.amMeta.saveFinishNodeSpec(finishNodeRequest.getNodeSpec());
                if (isWorkerZeroFinish(finishNodeRequest) && this.mlContext.isBatchMode()) {
                    TFTransitions.LOG.info("worker 0 finish and send finish cluster event!");
                    abstractAMStateMachine.sendEvent(new AMEvent(AMEventType.FINISH_CLUSTER, "", finishNodeRequest.getVersion()));
                } else if (0 == ((Integer) updateRemainJobNum(aMEvent).getOrDefault(new WorkerRole().name(), 0)).intValue() && this.mlContext.isStreamMode()) {
                    TFTransitions.LOG.info("send finish cluster event!");
                    abstractAMStateMachine.sendEvent(new AMEvent(AMEventType.FINISH_CLUSTER, "", finishNodeRequest.getVersion()));
                }
            } catch (IOException e) {
                e.printStackTrace();
                throw new InvalidStateTransitionException(getInternalState(), aMEvent);
            }
        }

        private static boolean isWorkerZeroFinish(FinishNodeRequest finishNodeRequest) {
            boolean z = false;
            if (finishNodeRequest.getNodeSpec().getRoleName().equals(new WorkerRole().name()) && 0 == finishNodeRequest.getNodeSpec().getIndex()) {
                z = true;
            }
            return z;
        }
    }

    /* loaded from: input_file:org/flinkextended/flink/ml/tensorflow/cluster/TFTransitions$RegisterNode.class */
    public static class RegisterNode extends AMTransition implements SingleArcTransition<AbstractAMStateMachine, AMEvent> {
        public RegisterNode(AbstractAMStateMachine abstractAMStateMachine) {
            super(abstractAMStateMachine);
        }

        public synchronized void transition(AbstractAMStateMachine abstractAMStateMachine, AMEvent aMEvent) throws InvalidStateTransitionException {
            RegisterNodeRequest registerNodeRequest = (RegisterNodeRequest) aMEvent.getMessage();
            TFTransitions.LOG.info("Register Node:" + ProtoUtil.protoToJson(registerNodeRequest.getNodeSpec()));
            try {
                if (this.eventReporter != null) {
                    this.eventReporter.nodeRegister(new String[]{nodeSpec2Str(registerNodeRequest.getNodeSpec())});
                }
                int i = 0;
                int i2 = 0;
                for (MLJobDef mLJobDef : this.amMeta.saveNodeSpec(registerNodeRequest.getNodeSpec()).getJobList()) {
                    if (mLJobDef.getName().equals(new WorkerRole().name())) {
                        i = mLJobDef.getTasksCount();
                    } else if (mLJobDef.getName().equals(new PsRole().name())) {
                        i2 = mLJobDef.getTasksCount();
                    }
                }
                Map updateRemainJobNum = updateRemainJobNum(aMEvent);
                int intValue = updateRemainJobNum.get(new WorkerRole().name()) != null ? ((Integer) updateRemainJobNum.get(new WorkerRole().name())).intValue() : 0;
                int intValue2 = updateRemainJobNum.get(new PsRole().name()) != null ? ((Integer) updateRemainJobNum.get(new PsRole().name())).intValue() : 0;
                boolean z = false;
                if (i == intValue && i2 == intValue2) {
                    z = true;
                }
                if (z && !registerNodeRequest.getNodeSpec().getRoleName().equals(new TensorBoardRole().name())) {
                    long version = registerNodeRequest.getVersion();
                    AMEvent aMEvent2 = new AMEvent(AMEventType.COMPLETE_CLUSTER, "", version);
                    TFTransitions.LOG.info("put complete event to state machine:" + version);
                    this.stateMachine.sendEvent(aMEvent2);
                }
            } catch (IOException e) {
                e.printStackTrace();
                throw new InvalidStateTransitionException(getInternalState(), aMEvent);
            }
        }
    }
}
