package org.flinkextended.flink.ml.operator.ops.inputformat;

import com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.flink.api.common.io.DefaultInputSplitAssigner;
import org.apache.flink.api.common.io.RichInputFormat;
import org.apache.flink.api.common.io.statistics.BaseStatistics;
import org.apache.flink.core.io.InputSplitAssigner;
import org.flinkextended.flink.ml.cluster.ClusterConfig;
import org.flinkextended.flink.ml.cluster.node.MLContext;
import org.flinkextended.flink.ml.data.DataExchange;
import org.flinkextended.flink.ml.operator.hook.FlinkOpHookManager;
import org.flinkextended.flink.ml.operator.util.PythonFileUtil;
import org.flinkextended.flink.ml.util.MLException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/flinkextended/flink/ml/operator/ops/inputformat/AbstractNodeInputFormat.class */
public abstract class AbstractNodeInputFormat<OUT> extends RichInputFormat<OUT, NodeInputSplit> {
    private static final Logger LOG = LoggerFactory.getLogger(AbstractNodeInputFormat.class);
    protected final ClusterConfig clusterConfig;
    private MLContext mlContext;
    private FlinkOpHookManager hookManager;
    private DataExchange<OUT, OUT> dataExchange;
    private FutureTask<Void> serverFuture;
    private Thread serverThread;
    protected long closeTimeoutMs = 30000;
    private final AtomicBoolean isClose = new AtomicBoolean(false);

    public AbstractNodeInputFormat(ClusterConfig clusterConfig) {
        this.clusterConfig = clusterConfig;
    }

    public InputSplitAssigner getInputSplitAssigner(NodeInputSplit[] nodeInputSplitArr) {
        return new DefaultInputSplitAssigner(nodeInputSplitArr);
    }

    public BaseStatistics getStatistics(BaseStatistics baseStatistics) throws IOException {
        return null;
    }

    public void open(NodeInputSplit nodeInputSplit) throws IOException {
        this.mlContext = prepareMLContext(Integer.valueOf(nodeInputSplit.getSplitNumber()));
        preparePythonFiles();
        this.serverThread = runRunnable(getNodeServerRunnable(this.mlContext), "NodeServer_" + maybeGetIdentity());
        this.mlContext.getOutputQueue().markFinished();
        iniAndRunHookOpen();
        this.dataExchange = new DataExchange<>(this.mlContext);
    }

    public boolean reachedEnd() throws IOException {
        return this.serverFuture.isDone();
    }

    public OUT nextRecord(OUT out) throws IOException {
        OUT out2;
        Object read = this.dataExchange.read(true);
        while (true) {
            out2 = (OUT) read;
            if (out2 != null || reachedEnd()) {
                break;
            }
            try {
                Thread.sleep(1000L);
                read = this.dataExchange.read(true);
            } catch (InterruptedException e) {
            }
        }
        return out2;
    }

    public void close() throws IOException {
        FutureTask<Void> futureTask;
        boolean isAlive;
        synchronized (this.isClose) {
            if (!this.isClose.get()) {
                LOG.info("Closing NodeInputFormat {}", maybeGetIdentity());
                try {
                    try {
                        try {
                            if (this.serverFuture != null && !this.serverFuture.isCancelled()) {
                                this.serverFuture.get(this.closeTimeoutMs, TimeUnit.MILLISECONDS);
                            }
                            if (this.serverFuture != null) {
                                this.serverFuture.cancel(true);
                                while (true) {
                                    try {
                                        this.serverThread.join(30000L);
                                    } catch (InterruptedException e) {
                                        LOG.error("Fail to wait for InputFormat to exit", e);
                                    }
                                    if (!this.serverThread.isAlive()) {
                                        break;
                                    }
                                    LOG.warn("InputFormat fail to exit in 30 second, interrupting...");
                                    this.serverThread.interrupt();
                                }
                            }
                            this.serverFuture = null;
                            if (this.mlContext != null) {
                                this.mlContext.close();
                                this.mlContext = null;
                            }
                        } finally {
                            if (futureTask != null) {
                                while (true) {
                                    if (!isAlive) {
                                        break;
                                    }
                                }
                            }
                        }
                    } catch (TimeoutException e2) {
                        LOG.error("Timeout on waiting node server {} to finish", maybeGetIdentity());
                        if (this.serverFuture != null) {
                            this.serverFuture.cancel(true);
                            while (true) {
                                try {
                                    this.serverThread.join(30000L);
                                } catch (InterruptedException e3) {
                                    LOG.error("Fail to wait for InputFormat to exit", e3);
                                }
                                if (!this.serverThread.isAlive()) {
                                    break;
                                }
                                LOG.warn("InputFormat fail to exit in 30 second, interrupting...");
                                this.serverThread.interrupt();
                            }
                        }
                        this.serverFuture = null;
                        if (this.mlContext != null) {
                            this.mlContext.close();
                            this.mlContext = null;
                        }
                    }
                } catch (InterruptedException e4) {
                    LOG.error("Fail to join server {}", maybeGetIdentity(), e4);
                    if (this.serverFuture != null) {
                        this.serverFuture.cancel(true);
                        while (true) {
                            try {
                                this.serverThread.join(30000L);
                            } catch (InterruptedException e5) {
                                LOG.error("Fail to wait for InputFormat to exit", e5);
                            }
                            if (!this.serverThread.isAlive()) {
                                break;
                            }
                            LOG.warn("InputFormat fail to exit in 30 second, interrupting...");
                            this.serverThread.interrupt();
                        }
                    }
                    this.serverFuture = null;
                    if (this.mlContext != null) {
                        this.mlContext.close();
                        this.mlContext = null;
                    }
                } catch (ExecutionException e6) {
                    LOG.error(maybeGetIdentity() + " node server failed {}", e6.getMessage());
                    throw new IOException(e6);
                }
                this.isClose.set(true);
            }
        }
        maybeRunHookClose();
    }

    private Thread runRunnable(Runnable runnable, String str) throws IOException {
        this.serverFuture = new FutureTask<>(runnable, null);
        try {
            Thread thread = new Thread(this.serverFuture);
            thread.setDaemon(true);
            thread.setName(str);
            thread.start();
            LOG.info("start: {}", str);
            return thread;
        } catch (Exception e) {
            LOG.error("Fail to start node service.", e);
            throw new IOException(e.getMessage());
        }
    }

    private void iniAndRunHookOpen() throws IOException {
        try {
            this.hookManager = new FlinkOpHookManager(this.mlContext.getHookClassNames());
            this.hookManager.open();
        } catch (Exception e) {
            e.printStackTrace();
            throw new IOException(e);
        }
    }

    private void maybeRunHookClose() throws IOException {
        if (null != this.hookManager) {
            try {
                this.hookManager.close();
            } catch (Exception e) {
                e.printStackTrace();
                throw new IOException(e);
            }
        }
    }

    protected abstract MLContext prepareMLContext(Integer num) throws MLException;

    protected abstract Runnable getNodeServerRunnable(MLContext mLContext);

    /* JADX INFO: Access modifiers changed from: protected */
    public void waitServerFutureFinish() throws ExecutionException, InterruptedException {
        this.serverFuture.get();
    }

    @VisibleForTesting
    void preparePythonFiles() throws IOException {
        PythonFileUtil.preparePythonFilesForExec(getRuntimeContext(), this.mlContext);
    }

    @VisibleForTesting
    boolean isClosed() {
        return this.isClose.get();
    }

    @VisibleForTesting
    FutureTask<Void> getServerFuture() {
        return this.serverFuture;
    }

    private String maybeGetIdentity() {
        return this.mlContext == null ? "" : this.mlContext.getIdentity();
    }
}
