package org.flinkextended.flink.ml.operator.ops.inputformat;

import java.io.IOException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.io.DefaultInputSplitAssigner;
import org.apache.flink.api.common.io.RichInputFormat;
import org.apache.flink.api.common.io.statistics.BaseStatistics;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.io.InputSplitAssigner;
import org.flinkextended.flink.ml.cluster.ExecutionMode;
import org.flinkextended.flink.ml.cluster.MLConfig;
import org.flinkextended.flink.ml.cluster.node.MLContext;
import org.flinkextended.flink.ml.cluster.role.AMRole;
import org.flinkextended.flink.ml.cluster.role.BaseRole;
import org.flinkextended.flink.ml.cluster.rpc.AppMasterServer;
import org.flinkextended.flink.ml.cluster.rpc.NodeServer;
import org.flinkextended.flink.ml.data.DataExchange;
import org.flinkextended.flink.ml.operator.hook.FlinkOpHookManager;
import org.flinkextended.flink.ml.operator.ops.ResourcesUtils;
import org.flinkextended.flink.ml.operator.util.ColumnInfos;
import org.flinkextended.flink.ml.operator.util.PythonFileUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/flinkextended/flink/ml/operator/ops/inputformat/MLInputFormat.class */
public class MLInputFormat<OUT> extends RichInputFormat<OUT, NodeInputSplit> {
    private static final Logger LOG = LoggerFactory.getLogger(MLInputFormat.class);
    private MLConfig mlConfig;
    private ExecutionMode mode;
    private BaseRole role;
    private TypeInformation<OUT> outTI;
    private transient FutureTask<Void> serverFuture;
    private transient MLContext mlContext;
    private final AtomicBoolean isClose = new AtomicBoolean(false);
    private transient FlinkOpHookManager hookManager;
    private transient DataExchange<OUT, OUT> dataExchange;

    public MLInputFormat(ExecutionMode executionMode, BaseRole baseRole, MLConfig mLConfig, TypeInformation<OUT> typeInformation) {
        this.mode = executionMode;
        this.role = baseRole;
        this.mlConfig = mLConfig;
        this.outTI = typeInformation;
    }

    public void configure(Configuration configuration) {
    }

    public BaseStatistics getStatistics(BaseStatistics baseStatistics) {
        return null;
    }

    /* renamed from: createInputSplits, reason: merged with bridge method [inline-methods] */
    public NodeInputSplit[] m9createInputSplits(int i) {
        int intValue = ((Integer) this.mlConfig.getRoleParallelismMap().getOrDefault(this.role.name(), 1)).intValue();
        NodeInputSplit[] nodeInputSplitArr = new NodeInputSplit[intValue];
        for (int i2 = 0; i2 < intValue; i2++) {
            nodeInputSplitArr[i2] = new NodeInputSplit(intValue, i2);
        }
        return nodeInputSplitArr;
    }

    public InputSplitAssigner getInputSplitAssigner(NodeInputSplit[] nodeInputSplitArr) {
        return new DefaultInputSplitAssigner(nodeInputSplitArr);
    }

    public void open(NodeInputSplit nodeInputSplit) throws IOException {
        ResourcesUtils.parseGpuInfo(getRuntimeContext(), this.mlConfig);
        this.mlContext = new MLContext(this.mode, this.mlConfig, this.role.name(), nodeInputSplit.getSplitNumber(), this.mlConfig.getEnvPath(), ColumnInfos.dummy().getNameToTypeMap());
        if (this.role.getClass().equals(AMRole.class)) {
            this.serverFuture = new FutureTask<>(new AppMasterServer(this.mlContext), null);
        } else {
            PythonFileUtil.preparePythonFilesForExec(getRuntimeContext(), this.mlContext);
            this.serverFuture = new FutureTask<>(new NodeServer(this.mlContext, this.role.name()), null);
        }
        try {
            Thread thread = new Thread(this.serverFuture);
            thread.setDaemon(true);
            thread.setName("NodeServer_" + this.mlContext.getIdentity());
            thread.start();
            LOG.info("start: {}", this.mlContext.getIdentity());
            this.mlContext.getOutputQueue().markFinished();
            try {
                this.hookManager = new FlinkOpHookManager(this.mlContext.getHookClassNames());
                this.hookManager.open();
                this.dataExchange = new DataExchange<>(this.mlContext);
            } catch (Exception e) {
                e.printStackTrace();
                throw new IOException(e);
            }
        } catch (Exception e2) {
            LOG.error("Fail to start node service.", e2);
            throw new IOException(e2.getMessage());
        }
    }

    public boolean reachedEnd() throws IOException {
        return this.role.getClass().equals(AMRole.class) || this.serverFuture.isDone() || this.dataExchange.getRecordReader().isReachEOF();
    }

    public OUT nextRecord(OUT out) throws IOException {
        return (OUT) this.dataExchange.read(true);
    }

    /* JADX WARN: Finally extract failed */
    public void close() throws IOException {
        synchronized (this.isClose) {
            try {
                if (!this.isClose.get()) {
                    try {
                        if (this.serverFuture != null && !this.serverFuture.isCancelled()) {
                            this.serverFuture.get();
                        }
                        this.serverFuture = null;
                        if (this.mlContext != null) {
                            this.mlContext.close();
                            this.mlContext = null;
                        }
                    } catch (InterruptedException e) {
                        LOG.warn("{} interrupted during waiting server join {}.", this.mlContext.getIdentity(), e.getMessage());
                        this.serverFuture.cancel(true);
                        this.serverFuture = null;
                        if (this.mlContext != null) {
                            this.mlContext.close();
                            this.mlContext = null;
                        }
                    } catch (ExecutionException e2) {
                        LOG.error(this.mlContext.getIdentity() + " node server failed {}", e2.getMessage());
                        throw new IOException(e2);
                    }
                    this.isClose.set(true);
                }
            } catch (Throwable th) {
                this.serverFuture = null;
                if (this.mlContext != null) {
                    this.mlContext.close();
                    this.mlContext = null;
                }
                throw th;
            }
        }
        if (null != this.hookManager) {
            try {
                this.hookManager.close();
            } catch (Exception e3) {
                e3.printStackTrace();
                throw new IOException(e3);
            }
        }
    }

    @VisibleForTesting
    public MLConfig getMlConfig() {
        return this.mlConfig;
    }

    @VisibleForTesting
    public BaseRole getRole() {
        return this.role;
    }
}
