package org.flinkextended.flink.ml.operator.ops;

import java.io.Closeable;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.io.Serializable;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.util.Collector;
import org.flinkextended.flink.ml.cluster.ExecutionMode;
import org.flinkextended.flink.ml.cluster.MLConfig;
import org.flinkextended.flink.ml.cluster.node.MLContext;
import org.flinkextended.flink.ml.cluster.role.BaseRole;
import org.flinkextended.flink.ml.cluster.rpc.NodeServer;
import org.flinkextended.flink.ml.data.DataExchange;
import org.flinkextended.flink.ml.operator.util.PythonFileUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/flinkextended/flink/ml/operator/ops/MLMapFunction.class */
public class MLMapFunction<IN, OUT> implements Closeable, Serializable {
    private BaseRole role;
    private MLConfig config;
    private TypeInformation<IN> inTI;
    private TypeInformation<OUT> outTI;
    private MLContext mlContext;
    private FutureTask<Void> serverFuture;
    private ExecutionMode mode;
    private transient DataExchange<IN, OUT> dataExchange;
    private volatile Collector<OUT> collector = null;
    private static final Logger LOG = LoggerFactory.getLogger(MLMapFunction.class);

    public MLMapFunction(ExecutionMode executionMode, BaseRole baseRole, MLConfig mLConfig, TypeInformation<IN> typeInformation, TypeInformation<OUT> typeInformation2) {
        this.mode = executionMode;
        this.role = baseRole;
        this.config = mLConfig;
        this.outTI = typeInformation2;
        this.inTI = typeInformation;
    }

    public void open(RuntimeContext runtimeContext) throws Exception {
        ResourcesUtils.parseGpuInfo(runtimeContext, this.config);
        this.mlContext = new MLContext(this.mode, this.config, this.role.name(), runtimeContext.getIndexOfThisSubtask(), this.config.getEnvPath(), (Map) null);
        PythonFileUtil.preparePythonFilesForExec(runtimeContext, this.mlContext);
        this.dataExchange = new DataExchange<>(this.mlContext);
        try {
            this.serverFuture = new FutureTask<>(new NodeServer(this.mlContext, this.role.name()), null);
            Thread thread = new Thread(this.serverFuture);
            thread.setDaemon(true);
            thread.setName("NodeServer_" + this.mlContext.getIdentity());
            thread.start();
            System.out.println("start:" + this.mlContext.getRoleName() + " index:" + this.mlContext.getIndex());
        } catch (Exception e) {
            LOG.error("Fail to start node service.", e);
            throw new IOException(e.getMessage());
        }
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        if (this.mlContext != null && this.mlContext.getOutputQueue() != null) {
            this.mlContext.getOutputQueue().markFinished();
        }
        try {
            try {
                try {
                    drainRead(this.collector, true);
                    if (this.serverFuture != null && !this.serverFuture.isCancelled()) {
                        this.serverFuture.get();
                    }
                } catch (ExecutionException e) {
                    LOG.error(this.mlContext.getIdentity() + " node server failed");
                    throw new RuntimeException(e);
                }
            } catch (InterruptedException e2) {
                LOG.error("Interrupted waiting for server join {}.", e2.getMessage());
                this.serverFuture.cancel(true);
                this.serverFuture = null;
                LOG.info("Records output: " + this.dataExchange.getReadRecords());
                if (this.mlContext != null) {
                    try {
                        this.mlContext.close();
                    } catch (IOException e3) {
                        LOG.error("Fail to close mlContext.", e3);
                    }
                    this.mlContext = null;
                }
            }
        } finally {
            this.serverFuture = null;
            LOG.info("Records output: " + this.dataExchange.getReadRecords());
            if (this.mlContext != null) {
                try {
                    this.mlContext.close();
                } catch (IOException e4) {
                    LOG.error("Fail to close mlContext.", e4);
                }
                this.mlContext = null;
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void flatMap(IN in, Collector<OUT> collector) throws Exception {
        boolean write;
        this.collector = collector;
        do {
            drainRead(collector, false);
            write = this.dataExchange.write(in);
            if (!write) {
                Thread.yield();
            }
        } while (!write);
    }

    public TypeInformation<OUT> getProducedType() {
        return this.outTI;
    }

    private void drainRead(Collector<OUT> collector, boolean z) {
        Object read;
        while (true) {
            try {
                read = this.dataExchange.read(z);
            } catch (InterruptedIOException e) {
                LOG.info("{} Reading from is interrupted, canceling the server", this.mlContext.getIdentity());
                this.serverFuture.cancel(true);
            } catch (IOException e2) {
                LOG.error("Fail to read data from python.", e2);
            }
            if (read == null) {
                return;
            } else {
                collector.collect(read);
            }
        }
    }
}
