package org.flinkextended.flink.ml.lib.tensorflow;

import com.google.common.base.Preconditions;
import java.lang.Thread;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.scala.typeutils.Types;
import org.apache.flink.table.functions.FunctionContext;
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.types.Row;
import org.flinkextended.flink.ml.lib.tensorflow.utils.RankUtil;
import org.flinkextended.flink.ml.lib.tensorflow.utils.TypeMapping;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.tensorflow.proto.framework.DataType;

/* loaded from: input_file:org/flinkextended/flink/ml/lib/tensorflow/TFInferenceUDTF.class */
public class TFInferenceUDTF extends TableFunction<Row> {
    private static final Logger LOG = LoggerFactory.getLogger(TFInferenceUDTF.class);
    private final String modelDir;
    private final String[] inputNames;
    private final String[] inputTypes;
    private final String[] inputRanks;
    private final String[] outputNames;
    private final String[] outputTypes;
    private final String[] outputRanks;
    private Properties props;
    private transient TFInference tfInference;
    private final int batchSize;
    private transient BlockingQueue<Object[]> rowCache;
    private ExecutorService predictService;
    private transient Future predictFuture;
    private static final String SEP = ",";
    private volatile boolean runningFlag = true;
    private volatile boolean failed = false;

    /* loaded from: input_file:org/flinkextended/flink/ml/lib/tensorflow/TFInferenceUDTF$InferenceExceptionHandler.class */
    class InferenceExceptionHandler implements Thread.UncaughtExceptionHandler {
        InferenceExceptionHandler() {
        }

        @Override // java.lang.Thread.UncaughtExceptionHandler
        public void uncaughtException(Thread thread, Throwable th) {
            th.printStackTrace();
            TFInferenceUDTF.this.rowCache.clear();
            TFInferenceUDTF.this.failed = true;
        }
    }

    /* loaded from: input_file:org/flinkextended/flink/ml/lib/tensorflow/TFInferenceUDTF$PredictRunner.class */
    private class PredictRunner implements Runnable {
        private List<Object[]> result;

        private PredictRunner() {
            this.result = new ArrayList(TFInferenceUDTF.this.batchSize);
        }

        @Override // java.lang.Runnable
        public void run() {
            Object[] objArr;
            while (TFInferenceUDTF.this.runningFlag) {
                if (TFInferenceUDTF.this.rowCache.isEmpty()) {
                    try {
                        objArr = (Object[]) TFInferenceUDTF.this.rowCache.poll(1L, TimeUnit.SECONDS);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                        TFInferenceUDTF.this.runningFlag = false;
                    }
                    if (null != objArr) {
                        this.result.add(objArr);
                    }
                } else {
                    TFInferenceUDTF.this.rowCache.drainTo(this.result);
                }
                for (Row row : TFInferenceUDTF.this.tfInference.inference(this.result)) {
                    TFInferenceUDTF.this.collect(row);
                }
                this.result.clear();
            }
            if (TFInferenceUDTF.this.rowCache.isEmpty()) {
                return;
            }
            TFInferenceUDTF.this.rowCache.drainTo(this.result);
            for (Row row2 : TFInferenceUDTF.this.tfInference.inference(this.result)) {
                TFInferenceUDTF.this.collect(row2);
            }
            this.result.clear();
        }
    }

    private String[] trim(String[] strArr) {
        for (int i = 0; i < strArr.length; i++) {
            strArr[i] = strArr[i].trim();
        }
        return strArr;
    }

    public TFInferenceUDTF(String str, String str2, String str3, String str4, String str5, String str6, String str7, Properties properties, int i) {
        this.modelDir = str;
        this.inputNames = str2.split(SEP);
        trim(this.inputNames);
        this.inputTypes = str3.split(SEP);
        trim(this.inputTypes);
        this.inputRanks = str4.split(SEP);
        trim(this.inputRanks);
        this.outputNames = str5.split(SEP);
        trim(this.outputNames);
        this.outputTypes = str6.split(SEP);
        trim(this.outputTypes);
        this.outputRanks = str7.split(SEP);
        trim(this.outputRanks);
        this.props = properties;
        this.batchSize = i;
    }

    public void open(FunctionContext functionContext) throws Exception {
        super.open(functionContext);
        this.rowCache = new LinkedBlockingQueue(this.batchSize);
        DataType[] convertToDataTypes = TypeMapping.convertToDataTypes(this.inputTypes);
        DataType[] convertToDataTypes2 = TypeMapping.convertToDataTypes(this.outputTypes);
        this.tfInference = new TFInference(this.modelDir, this.inputNames, convertToDataTypes, RankUtil.toRanks(this.inputRanks), this.outputNames, convertToDataTypes2, RankUtil.toRanks(this.outputRanks), this.props);
        this.predictService = Executors.newFixedThreadPool(1, runnable -> {
            Thread thread = new Thread(runnable);
            thread.setDaemon(true);
            thread.setName("inference-thread");
            thread.setUncaughtExceptionHandler(new InferenceExceptionHandler());
            return thread;
        });
        this.predictFuture = this.predictService.submit(new PredictRunner());
    }

    public void close() throws Exception {
        super.close();
        this.runningFlag = false;
        if (null != this.predictFuture) {
            this.predictFuture.get();
        }
        if (null != this.predictService) {
            this.predictService.shutdown();
            this.predictService.awaitTermination(5L, TimeUnit.SECONDS);
        }
        if (null != this.tfInference) {
            this.tfInference.close();
        }
    }

    public TypeInformation<Row> getResultType() {
        return Types.ROW(this.outputNames, TypeMapping.convertToTypeInformation(this.outputTypes, RankUtil.toRanks(this.outputRanks)));
    }

    public void eval(Object... objArr) {
        if (this.failed) {
            throw new RuntimeException("inference thread failed!");
        }
        Preconditions.checkArgument(objArr.length == this.inputNames.length, "Input fields length mismatch");
        try {
            this.rowCache.put(objArr);
        } catch (InterruptedException e) {
            e.printStackTrace();
            throw new RuntimeException(e.getMessage());
        }
    }
}
