package org.flinkextended.flink.ml.operator.ops.inputformat;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.io.DefaultInputSplitAssigner;
import org.apache.flink.configuration.Configuration;
import org.flinkextended.flink.ml.cluster.ClusterConfig;
import org.flinkextended.flink.ml.cluster.ExecutionMode;
import org.flinkextended.flink.ml.cluster.node.MLContext;
import org.flinkextended.flink.ml.coding.CodingException;
import org.flinkextended.flink.ml.coding.Decoding;
import org.flinkextended.flink.ml.data.RecordReader;
import org.flinkextended.flink.ml.operator.util.ColumnInfos;
import org.flinkextended.flink.ml.util.MLException;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;

/* loaded from: input_file:org/flinkextended/flink/ml/operator/ops/inputformat/AbstractNodeInputFormatTest.class */
public class AbstractNodeInputFormatTest {
    private DummyNodeInputFormat nodeInputFormat;

    /* loaded from: input_file:org/flinkextended/flink/ml/operator/ops/inputformat/AbstractNodeInputFormatTest$DummyNodeInputFormat.class */
    private static class DummyNodeInputFormat extends AbstractNodeInputFormat<Integer> {
        private final AtomicBoolean finished;

        public DummyNodeInputFormat(ClusterConfig clusterConfig) {
            super(clusterConfig);
            this.finished = new AtomicBoolean(false);
            this.closeTimeoutMs = 1000L;
        }

        public void configure(Configuration configuration) {
        }

        /* renamed from: createInputSplits, reason: merged with bridge method [inline-methods] */
        public NodeInputSplit[] m2createInputSplits(int i) throws IOException {
            return new NodeInputSplit[0];
        }

        protected MLContext prepareMLContext(Integer num) throws MLException {
            return new MLContext(ExecutionMode.OTHER, "worker", 0, this.clusterConfig.getNodeTypeCntMap(), this.clusterConfig.getEntryFuncName(), this.clusterConfig.getProperties(), this.clusterConfig.getPythonVirtualEnvZipPath(), ColumnInfos.dummy().getNameToTypeMap());
        }

        protected Runnable getNodeServerRunnable(MLContext mLContext) {
            return () -> {
                while (!this.finished.get()) {
                    synchronized (this.finished) {
                        try {
                            if (this.finished.get()) {
                                return;
                            } else {
                                this.finished.wait();
                            }
                        } catch (InterruptedException e) {
                            return;
                        }
                    }
                }
            };
        }

        void preparePythonFiles() {
        }

        public void markFinish() {
            synchronized (this.finished) {
                this.finished.set(true);
                this.finished.notify();
            }
        }
    }

    /* loaded from: input_file:org/flinkextended/flink/ml/operator/ops/inputformat/AbstractNodeInputFormatTest$TestDecoding.class */
    public static class TestDecoding implements Decoding<Integer> {
        public TestDecoding(MLContext mLContext) {
        }

        /* renamed from: decode, reason: merged with bridge method [inline-methods] */
        public Integer m3decode(byte[] bArr) throws CodingException {
            return Integer.valueOf(ByteBuffer.wrap(bArr).getInt());
        }
    }

    /* loaded from: input_file:org/flinkextended/flink/ml/operator/ops/inputformat/AbstractNodeInputFormatTest$TestRecordReader.class */
    public static class TestRecordReader implements RecordReader {
        private int count = 0;

        public TestRecordReader(MLContext mLContext) {
        }

        public byte[] tryRead() throws IOException {
            ByteBuffer allocate = ByteBuffer.allocate(4);
            int i = this.count;
            this.count = i + 1;
            return allocate.putInt(i).array();
        }

        public boolean isReachEOF() {
            return this.count > 1;
        }

        public byte[] read() throws IOException {
            return tryRead();
        }

        public void close() throws IOException {
        }
    }

    @Before
    public void setUp() throws Exception {
        this.nodeInputFormat = new DummyNodeInputFormat(ClusterConfig.newBuilder().setNodeEntry("entry.py", "main").setProperty("sys:record_reader_class", TestRecordReader.class.getName()).setProperty("sys:decoding_class", TestDecoding.class.getName()).build());
        this.nodeInputFormat.setRuntimeContext((RuntimeContext) Mockito.mock(RuntimeContext.class));
    }

    @Test
    public void testGetInputSplitAssigner() {
        MatcherAssert.assertThat(this.nodeInputFormat.getInputSplitAssigner(new NodeInputSplit[0]), Matchers.instanceOf(DefaultInputSplitAssigner.class));
    }

    @Test
    public void testGetStatistics() throws IOException {
        Assert.assertNull(this.nodeInputFormat.getStatistics(null));
    }

    @Test
    public void testOpen() throws IOException {
        DummyNodeInputFormat dummyNodeInputFormat = (DummyNodeInputFormat) Mockito.spy(this.nodeInputFormat);
        dummyNodeInputFormat.open(new NodeInputSplit(1, 0));
        ((DummyNodeInputFormat) Mockito.verify(dummyNodeInputFormat, Mockito.times(1))).prepareMLContext((Integer) org.mockito.Matchers.any());
        ((DummyNodeInputFormat) Mockito.verify(dummyNodeInputFormat, Mockito.times(1))).getNodeServerRunnable((MLContext) org.mockito.Matchers.any());
        ((DummyNodeInputFormat) Mockito.verify(dummyNodeInputFormat, Mockito.times(1))).preparePythonFiles();
    }

    @Test
    public void testReachedEnd() throws IOException, ExecutionException, InterruptedException {
        this.nodeInputFormat.open(new NodeInputSplit(1, 0));
        Assert.assertFalse(this.nodeInputFormat.reachedEnd());
        this.nodeInputFormat.markFinish();
        this.nodeInputFormat.waitServerFutureFinish();
        Assert.assertTrue(this.nodeInputFormat.reachedEnd());
    }

    @Test
    public void testNextRecord() throws IOException, ExecutionException, InterruptedException {
        this.nodeInputFormat.open(new NodeInputSplit(1, 0));
        Assert.assertFalse(this.nodeInputFormat.reachedEnd());
        Assert.assertEquals(0, this.nodeInputFormat.nextRecord(null));
        Assert.assertEquals(1, this.nodeInputFormat.nextRecord(null));
        Assert.assertFalse(this.nodeInputFormat.reachedEnd());
        this.nodeInputFormat.markFinish();
        this.nodeInputFormat.waitServerFutureFinish();
        Assert.assertTrue(this.nodeInputFormat.reachedEnd());
    }

    @Test
    public void testClose() throws IOException, InterruptedException, ExecutionException {
        DummyNodeInputFormat dummyNodeInputFormat = (DummyNodeInputFormat) Mockito.spy(this.nodeInputFormat);
        dummyNodeInputFormat.open(new NodeInputSplit(1, 0));
        dummyNodeInputFormat.markFinish();
        dummyNodeInputFormat.waitServerFutureFinish();
        dummyNodeInputFormat.close();
        Assert.assertTrue(dummyNodeInputFormat.isClosed());
    }

    @Test
    public void testCloseTimeout() throws IOException {
        DummyNodeInputFormat dummyNodeInputFormat = (DummyNodeInputFormat) Mockito.spy(this.nodeInputFormat);
        dummyNodeInputFormat.open(new NodeInputSplit(1, 0));
        dummyNodeInputFormat.close();
        Assert.assertTrue(dummyNodeInputFormat.isClosed());
    }
}
