package alluxio.client.block.stream;

import alluxio.ClientContext;
import alluxio.ConfigurationTestUtils;
import alluxio.client.block.stream.GrpcDataReader;
import alluxio.client.file.FileSystemContext;
import alluxio.grpc.Chunk;
import alluxio.grpc.ReadRequest;
import alluxio.grpc.ReadResponse;
import alluxio.network.protocol.databuffer.DataBuffer;
import alluxio.network.protocol.databuffer.NioDataBuffer;
import alluxio.util.io.BufferUtils;
import alluxio.wire.WorkerNetAddress;
import com.google.protobuf.ByteString;
import io.grpc.stub.ClientCallStreamObserver;
import io.grpc.stub.StreamObserver;
import io.netty.buffer.ByteBuf;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeoutException;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.ArgumentCaptor;
import org.mockito.Matchers;
import org.mockito.Mockito;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;

@PrepareForTest({FileSystemContext.class, WorkerNetAddress.class})
@RunWith(PowerMockRunner.class)
/* loaded from: input_file:alluxio/client/block/stream/GrpcDataReaderTest.class */
public final class GrpcDataReaderTest {
    private static final int CHUNK_SIZE = 1024;
    private static final ExecutorService EXECUTOR = Executors.newFixedThreadPool(4);
    private static final Random RANDOM = new Random();
    private static final long BLOCK_ID = 1;
    private FileSystemContext mContext;
    private WorkerNetAddress mAddress;
    private BlockWorkerClient mClient;
    private GrpcDataReader.Factory mFactory;
    private ClientCallStreamObserver<ReadRequest> mRequestObserver;

    @Before
    public void before() throws Exception {
        this.mContext = (FileSystemContext) PowerMockito.mock(FileSystemContext.class);
        PowerMockito.when(this.mContext.getClientContext()).thenReturn(ClientContext.create(ConfigurationTestUtils.defaults()));
        PowerMockito.when(this.mContext.getClusterConf()).thenReturn(ConfigurationTestUtils.defaults());
        this.mAddress = (WorkerNetAddress) Mockito.mock(WorkerNetAddress.class);
        this.mFactory = new GrpcDataReader.Factory(this.mContext, this.mAddress, ReadRequest.newBuilder().setBlockId(BLOCK_ID).setChunkSize(1024L).build());
        this.mClient = (BlockWorkerClient) Mockito.mock(BlockWorkerClient.class);
        this.mRequestObserver = (ClientCallStreamObserver) Mockito.mock(ClientCallStreamObserver.class);
        PowerMockito.when(this.mContext.acquireBlockWorkerClient(this.mAddress)).thenReturn(new NoopClosableResource(this.mClient));
        PowerMockito.when(this.mClient.readBlock((StreamObserver) Matchers.any(StreamObserver.class))).thenReturn(this.mRequestObserver);
        PowerMockito.when(Boolean.valueOf(this.mRequestObserver.isReady())).thenReturn(true);
    }

    @After
    public void after() throws Exception {
        this.mClient.close();
    }

    @Test
    public void readEmptyFile() throws Exception {
        DataReader create = create(0L, 10L);
        Throwable th = null;
        try {
            setReadResponses(this.mClient, 0L, 0L, 0L);
            Assert.assertEquals((Object) null, create.readChunk());
            if (create != null) {
                if (0 != 0) {
                    try {
                        create.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                } else {
                    create.close();
                }
            }
            validateReadRequestSent(this.mClient, 0L, 10L, true, CHUNK_SIZE);
        } catch (Throwable th3) {
            if (create != null) {
                if (0 != 0) {
                    try {
                        create.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    create.close();
                }
            }
            throw th3;
        }
    }

    @Test(timeout = 60000)
    public void readFullFile() throws Exception {
        DataReader create = create(0L, 1048917L);
        Throwable th = null;
        try {
            try {
                Assert.assertEquals(setReadResponses(this.mClient, 1048917L, 0L, 1048917 - BLOCK_ID), checkChunks(create, 0L, 1048917L));
                if (create != null) {
                    if (0 != 0) {
                        try {
                            create.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        create.close();
                    }
                }
                validateReadRequestSent(this.mClient, 0L, 1048917L, true, CHUNK_SIZE);
            } finally {
            }
        } catch (Throwable th3) {
            if (create != null) {
                if (th != null) {
                    try {
                        create.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    create.close();
                }
            }
            throw th3;
        }
    }

    @Test(timeout = 60000)
    public void readPartialFile() throws Exception {
        long j = 1048917 / 3;
        DataReader create = create(10L, 1048917L);
        Throwable th = null;
        try {
            try {
                Assert.assertEquals(setReadResponses(this.mClient, 1048917L, 100L, j - BLOCK_ID), checkChunks(create, 100L, j));
                if (create != null) {
                    if (0 != 0) {
                        try {
                            create.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        create.close();
                    }
                }
                validateReadRequestSent(this.mClient, 10L, 1048917L, true, CHUNK_SIZE);
            } finally {
            }
        } catch (Throwable th3) {
            if (create != null) {
                if (th != null) {
                    try {
                        create.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    create.close();
                }
            }
            throw th3;
        }
    }

    @Test(timeout = 60000)
    public void fileLengthUnknown() throws Exception {
        long j = 1048917 / 3;
        DataReader create = create(0L, Long.MAX_VALUE);
        Throwable th = null;
        try {
            Assert.assertEquals(setReadResponses(this.mClient, 1048917L, 100L, j - BLOCK_ID), checkChunks(create, 100L, j));
            if (create != null) {
                if (0 != 0) {
                    try {
                        create.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                } else {
                    create.close();
                }
            }
            validateReadRequestSent(this.mClient, 0L, Long.MAX_VALUE, true, CHUNK_SIZE);
        } catch (Throwable th3) {
            if (create != null) {
                if (0 != 0) {
                    try {
                        create.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    create.close();
                }
            }
            throw th3;
        }
    }

    private DataReader create(long j, long j2) throws Exception {
        return this.mFactory.create(j, j2);
    }

    private long checkChunks(DataReader dataReader, long j, long j2) throws Exception {
        long j3 = 0;
        long j4 = 0;
        while (true) {
            DataBuffer readChunk = dataReader.readChunk();
            if (readChunk == null) {
                return j4;
            }
            try {
                Assert.assertTrue(readChunk instanceof NioDataBuffer);
                ByteBuf byteBuf = (ByteBuf) readChunk.getNettyOutput();
                byte[] bArr = new byte[byteBuf.readableBytes()];
                byteBuf.readBytes(bArr);
                for (byte b : bArr) {
                    if (j3 >= j) {
                        j4 += BufferUtils.byteToInt(b);
                    }
                    j3 += BLOCK_ID;
                    if (j3 >= j2) {
                        return j4;
                    }
                }
                readChunk.release();
            } finally {
                readChunk.release();
            }
        }
    }

    private void validateReadRequestSent(BlockWorkerClient blockWorkerClient, long j, long j2, boolean z, int i) throws TimeoutException, InterruptedException {
        ArgumentCaptor forClass = ArgumentCaptor.forClass(ReadRequest.class);
        ((ClientCallStreamObserver) Mockito.verify(this.mRequestObserver, Mockito.atLeastOnce())).onNext(forClass.capture());
        ArgumentCaptor forClass2 = ArgumentCaptor.forClass(StreamObserver.class);
        ((BlockWorkerClient) Mockito.verify(this.mClient)).readBlock((StreamObserver) forClass2.capture());
        List allValues = forClass.getAllValues();
        Assert.assertTrue(!allValues.isEmpty());
        ((StreamObserver) forClass2.getValue()).onCompleted();
        long j3 = j;
        for (int i2 = 0; i2 < allValues.size(); i2++) {
            ReadRequest readRequest = (ReadRequest) allValues.get(i2);
            if (i2 == 0) {
                Assert.assertTrue(readRequest != null);
                Assert.assertEquals(BLOCK_ID, readRequest.getBlockId());
                Assert.assertEquals(j, readRequest.getOffset());
                Assert.assertEquals(j2, readRequest.getLength());
                Assert.assertEquals(i, readRequest.getChunkSize());
            } else {
                Assert.assertTrue(readRequest.hasOffsetReceived());
                Assert.assertTrue(readRequest.getOffsetReceived() > j3);
                Assert.assertTrue(readRequest.getOffsetReceived() <= j2);
                j3 = readRequest.getOffsetReceived();
            }
        }
        ((ClientCallStreamObserver) Mockito.verify(this.mRequestObserver, z ? Mockito.atLeastOnce() : Mockito.never())).onCompleted();
    }

    private long setReadResponses(BlockWorkerClient blockWorkerClient, long j, long j2, long j3) {
        long j4 = 0;
        long j5 = 0;
        long j6 = j;
        ArgumentCaptor forClass = ArgumentCaptor.forClass(StreamObserver.class);
        ((BlockWorkerClient) Mockito.verify(this.mClient)).readBlock((StreamObserver) forClass.capture());
        StreamObserver streamObserver = (StreamObserver) forClass.getValue();
        ArrayList arrayList = new ArrayList();
        while (j6 > 0) {
            int min = (int) Math.min(j6, 1024L);
            byte[] bArr = new byte[min];
            RANDOM.nextBytes(bArr);
            arrayList.add(ReadResponse.newBuilder().setChunk(Chunk.newBuilder().setData(ByteString.copyFrom(bArr))).build());
            j6 -= min;
            for (byte b : bArr) {
                if (j5 >= j2 && j5 <= j3) {
                    j4 += BufferUtils.byteToInt(b);
                }
                j5 += BLOCK_ID;
            }
        }
        EXECUTOR.submit(() -> {
            Iterator it = arrayList.iterator();
            while (it.hasNext()) {
                streamObserver.onNext((ReadResponse) it.next());
            }
            streamObserver.onCompleted();
        });
        return j4;
    }
}
