package alluxio.client.block.stream;

import alluxio.ClientContext;
import alluxio.ConfigurationRule;
import alluxio.ConfigurationTestUtils;
import alluxio.client.file.FileSystemContext;
import alluxio.client.file.options.OutStreamOptions;
import alluxio.conf.InstancedConfiguration;
import alluxio.conf.PropertyKey;
import alluxio.exception.status.ResourceExhaustedException;
import alluxio.grpc.Chunk;
import alluxio.grpc.RequestType;
import alluxio.grpc.WriteRequest;
import alluxio.util.CommonUtils;
import alluxio.util.ThreadFactoryUtils;
import alluxio.util.io.BufferUtils;
import alluxio.wire.WorkerNetAddress;
import io.grpc.stub.ClientCallStreamObserver;
import io.grpc.stub.StreamObserver;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeoutException;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.ArgumentCaptor;
import org.mockito.Matchers;
import org.mockito.Mockito;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@PrepareForTest({FileSystemContext.class, WorkerNetAddress.class})
@RunWith(PowerMockRunner.class)
/* loaded from: input_file:alluxio/client/block/stream/UfsFallbackLocalFileDataWriterTest.class */
public class UfsFallbackLocalFileDataWriterTest {
    private static final int CHUNK_SIZE = 1024;
    private static final long BLOCK_ID = 1;
    private static final long MOUNT_ID = 9;
    private ByteBuffer mBuffer;
    private FixedCapacityTestDataWriter mLocalWriter;
    private ClientContext mClientContext;
    private FileSystemContext mContext;
    private WorkerNetAddress mAddress;
    private BlockWorkerClient mClient;
    private ClientCallStreamObserver<WriteRequest> mRequestObserver;
    private InstancedConfiguration mConf = ConfigurationTestUtils.defaults();

    @Rule
    public ConfigurationRule mConfigurationRule = new ConfigurationRule(PropertyKey.USER_STREAMING_WRITER_CHUNK_SIZE_BYTES, String.valueOf(CHUNK_SIZE), this.mConf);
    private static final Logger LOG = LoggerFactory.getLogger(GrpcDataWriterTest.class);
    private static final ExecutorService EXECUTOR = Executors.newFixedThreadPool(4, ThreadFactoryUtils.build("test-executor-%d", true));
    private static final Random RANDOM = new Random();

    /* loaded from: input_file:alluxio/client/block/stream/UfsFallbackLocalFileDataWriterTest$FixedCapacityTestDataWriter.class */
    public static class FixedCapacityTestDataWriter extends TestDataWriter {
        private final long mCapacity;
        private final ByteBuffer mBuffer;
        private boolean mIsLocalWorkerFull;
        private boolean mClosed;
        private boolean mCanceled;

        public FixedCapacityTestDataWriter(ByteBuffer byteBuffer) {
            super(byteBuffer);
            this.mIsLocalWorkerFull = false;
            this.mClosed = false;
            this.mCanceled = false;
            this.mCapacity = byteBuffer.capacity();
            this.mBuffer = byteBuffer;
        }

        @Override // alluxio.client.block.stream.TestDataWriter
        public void writeChunk(ByteBuf byteBuf) throws IOException {
            if (pos() + byteBuf.readableBytes() > this.mCapacity) {
                this.mIsLocalWorkerFull = true;
            }
            if (this.mIsLocalWorkerFull) {
                throw new ResourceExhaustedException("no more space!");
            }
            synchronized (this.mBuffer) {
                super.writeChunk(byteBuf);
            }
        }

        @Override // alluxio.client.block.stream.TestDataWriter
        public void close() {
            super.close();
            if (this.mClosed) {
                return;
            }
            this.mClosed = true;
        }

        @Override // alluxio.client.block.stream.TestDataWriter
        public void cancel() {
            super.cancel();
            if (this.mCanceled) {
                return;
            }
            this.mCanceled = true;
            this.mClosed = true;
        }

        public boolean isClosed() {
            return this.mClosed;
        }

        public boolean isCanceled() {
            return this.mCanceled;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:alluxio/client/block/stream/UfsFallbackLocalFileDataWriterTest$WriteSummary.class */
    public class WriteSummary {
        private final long mBytes;
        private final long mChecksum;

        public WriteSummary(long j, long j2) {
            this.mBytes = j;
            this.mChecksum = j2;
        }

        public long getBytes() {
            return this.mBytes;
        }

        public long getChecksum() {
            return this.mChecksum;
        }
    }

    @Before
    public void before() throws Exception {
        this.mClientContext = ClientContext.create(this.mConf);
        this.mContext = (FileSystemContext) PowerMockito.mock(FileSystemContext.class);
        this.mAddress = (WorkerNetAddress) Mockito.mock(WorkerNetAddress.class);
        this.mClient = (BlockWorkerClient) Mockito.mock(BlockWorkerClient.class);
        this.mRequestObserver = (ClientCallStreamObserver) Mockito.mock(ClientCallStreamObserver.class);
        PowerMockito.when(this.mContext.getClientContext()).thenReturn(this.mClientContext);
        PowerMockito.when(this.mContext.getClusterConf()).thenReturn(this.mConf);
        PowerMockito.when(this.mContext.acquireBlockWorkerClient(this.mAddress)).thenReturn(new NoopClosableResource(this.mClient));
        PowerMockito.when(this.mClient.writeBlock((StreamObserver) Matchers.any(StreamObserver.class))).thenReturn(this.mRequestObserver);
        PowerMockito.when(Boolean.valueOf(this.mRequestObserver.isReady())).thenReturn(true);
    }

    @After
    public void after() throws Exception {
        this.mClient.close();
    }

    private DataWriter create(long j, long j2) throws Exception {
        this.mBuffer = ByteBuffer.allocate((int) j2);
        this.mLocalWriter = new FixedCapacityTestDataWriter(this.mBuffer);
        return new UfsFallbackLocalFileDataWriter(this.mLocalWriter, (GrpcDataWriter) null, this.mContext, this.mAddress, BLOCK_ID, j, OutStreamOptions.defaults(this.mClientContext).setMountId(MOUNT_ID));
    }

    @Test
    public void emptyBlock() throws Exception {
        DataWriter create = create(BLOCK_ID, BLOCK_ID);
        try {
            create.flush();
            Assert.assertEquals(0L, create.pos());
            if (create != null) {
                create.close();
            }
            Assert.assertEquals(0L, this.mBuffer.position());
        } catch (Throwable th) {
            if (create != null) {
                try {
                    create.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test(timeout = 60000)
    public void noFallback() throws Exception {
        DataWriter create = create(1048917L, 1048917L);
        try {
            Future<WriteSummary> writeData = writeData(create, 1048917L);
            Future<WriteSummary> localWrite = getLocalWrite(this.mBuffer);
            writeData.get();
            if (create != null) {
                create.close();
            }
            Assert.assertEquals(writeData.get().getBytes(), localWrite.get().getBytes());
            Assert.assertEquals(writeData.get().getChecksum(), localWrite.get().getChecksum());
        } catch (Throwable th) {
            if (create != null) {
                try {
                    create.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test(timeout = 60000)
    @Ignore("Flaky test")
    public void fallbackOnFirstChunk() throws Exception {
        DataWriter create = create(1048917L, BLOCK_ID);
        try {
            Future<WriteSummary> writeData = writeData(create, 1048917L);
            Future<WriteSummary> localWrite = getLocalWrite(this.mBuffer);
            Future<WriteSummary> ufsWrite = getUfsWrite(this.mClient);
            writeData.get();
            if (create != null) {
                create.close();
            }
            Assert.assertEquals(1048917L, writeData.get().getBytes());
            Assert.assertEquals(0L, localWrite.get().getBytes());
            Assert.assertEquals(1048917L, ufsWrite.get().getBytes());
            Assert.assertEquals(writeData.get().getBytes(), ufsWrite.get().getBytes());
            Assert.assertEquals(writeData.get().getChecksum(), ufsWrite.get().getChecksum());
        } catch (Throwable th) {
            if (create != null) {
                try {
                    create.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test(timeout = 60000)
    @Ignore("Flaky test")
    public void fallbackOnSecondChunk() throws Exception {
        DataWriter create = create(1048917L, 1024L);
        try {
            Future<WriteSummary> writeData = writeData(create, 1048917L);
            Future<WriteSummary> localWrite = getLocalWrite(this.mBuffer);
            Future<WriteSummary> ufsWrite = getUfsWrite(this.mClient);
            writeData.get();
            if (create != null) {
                create.close();
            }
            Assert.assertEquals(1048917L, writeData.get().getBytes());
            Assert.assertEquals(1024L, localWrite.get().getBytes());
            Assert.assertEquals(1048917 - 1024, ufsWrite.get().getBytes());
            Assert.assertEquals(writeData.get().getChecksum(), localWrite.get().getChecksum() + ufsWrite.get().getChecksum());
        } catch (Throwable th) {
            if (create != null) {
                try {
                    create.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test(timeout = 60000)
    public void fallbackOnLastChunk() throws Exception {
        DataWriter create = create(1048917L, 1048576L);
        try {
            Future<WriteSummary> writeData = writeData(create, 1048917L);
            writeData.get();
            Future<WriteSummary> localWrite = getLocalWrite(this.mBuffer);
            Future<WriteSummary> ufsWrite = getUfsWrite(this.mClient);
            if (create != null) {
                create.close();
            }
            Assert.assertEquals(1048917L, writeData.get().getBytes());
            Assert.assertEquals(1048576L, localWrite.get().getBytes());
            Assert.assertEquals(1048917 - 1048576, ufsWrite.get().getBytes());
            Assert.assertEquals(writeData.get().getChecksum(), localWrite.get().getChecksum() + ufsWrite.get().getChecksum());
        } catch (Throwable th) {
            if (create != null) {
                try {
                    create.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test(timeout = 60000)
    @Ignore("Flaky test")
    public void flush() throws Exception {
        DataWriter create = create(1048917L, 1024L);
        try {
            writeData(create, 1024L).get();
            create.flush();
            Assert.assertEquals(1024L, this.mBuffer.position());
            if (create != null) {
                create.close();
            }
        } catch (Throwable th) {
            if (create != null) {
                try {
                    create.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test(timeout = 60000)
    @Ignore("Flaky test")
    public void pos() throws Exception {
        DataWriter create = create(2389L, 1024L);
        try {
            byte[] bArr = new byte[1];
            Future<WriteSummary> ufsWrite = getUfsWrite(this.mClient);
            for (long j = 0; j < 2389; j += BLOCK_ID) {
                Assert.assertEquals(j, create.pos());
                create.writeChunk(Unpooled.wrappedBuffer(bArr));
            }
            ufsWrite.get();
            if (create != null) {
                create.close();
            }
        } catch (Throwable th) {
            if (create != null) {
                try {
                    create.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    private Future<WriteSummary> writeData(final DataWriter dataWriter, final long j) throws Exception {
        return EXECUTOR.submit(new Callable<WriteSummary>() { // from class: alluxio.client.block.stream.UfsFallbackLocalFileDataWriterTest.1
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.util.concurrent.Callable
            public WriteSummary call() throws IOException {
                try {
                    long j2 = 0;
                    long j3 = j;
                    while (j3 > 0) {
                        int min = (int) Math.min(j3, 1024L);
                        byte[] bArr = new byte[min];
                        UfsFallbackLocalFileDataWriterTest.RANDOM.nextBytes(bArr);
                        try {
                            dataWriter.writeChunk(Unpooled.wrappedBuffer(bArr));
                            j3 -= min;
                            for (byte b : bArr) {
                                j2 += BufferUtils.byteToInt(b);
                            }
                        } catch (Exception e) {
                            Assert.fail(e.getMessage());
                            throw e;
                        }
                    }
                    return new WriteSummary(j, j2);
                } catch (Throwable th) {
                    UfsFallbackLocalFileDataWriterTest.LOG.error("Failed to write file.", th);
                    Assert.fail("Failed to write file." + th.getMessage());
                    throw th;
                }
            }
        });
    }

    private Future<WriteSummary> getUfsWrite(BlockWorkerClient blockWorkerClient) {
        return EXECUTOR.submit(new Callable<WriteSummary>() { // from class: alluxio.client.block.stream.UfsFallbackLocalFileDataWriterTest.2
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.util.concurrent.Callable
            public WriteSummary call() throws TimeoutException, InterruptedException {
                try {
                    ArgumentCaptor forClass = ArgumentCaptor.forClass(WriteRequest.class);
                    ((ClientCallStreamObserver) Mockito.verify(UfsFallbackLocalFileDataWriterTest.this.mRequestObserver, Mockito.atLeastOnce())).onNext((WriteRequest) forClass.capture());
                    ArgumentCaptor forClass2 = ArgumentCaptor.forClass(StreamObserver.class);
                    ((BlockWorkerClient) Mockito.verify(UfsFallbackLocalFileDataWriterTest.this.mClient)).writeBlock((StreamObserver) forClass2.capture());
                    ((StreamObserver) forClass2.getValue()).onCompleted();
                    long j = 0;
                    long j2 = 0;
                    long j3 = 0;
                    for (WriteRequest writeRequest : forClass.getAllValues()) {
                        UfsFallbackLocalFileDataWriterTest.this.validateWriteRequest(writeRequest, j2);
                        if (writeRequest.hasCommand() && writeRequest.getCommand().getCreateUfsBlockOptions().hasBytesInBlockStore()) {
                            j2 += writeRequest.getCommand().getCreateUfsBlockOptions().getBytesInBlockStore();
                        } else if (writeRequest.hasChunk()) {
                            Chunk chunk = writeRequest.getChunk();
                            Assert.assertTrue(chunk.hasData());
                            int length = chunk.getData().toByteArray().length;
                            for (int i = 0; i < length; i++) {
                                j += BufferUtils.byteToInt(r0[i]);
                                j2 += UfsFallbackLocalFileDataWriterTest.BLOCK_ID;
                                j3 += UfsFallbackLocalFileDataWriterTest.BLOCK_ID;
                            }
                        }
                    }
                    return new WriteSummary(j3, j);
                } catch (Throwable th) {
                    Assert.fail("Failed to verify write requests." + th.getMessage());
                    throw th;
                }
            }
        });
    }

    private Future<WriteSummary> getLocalWrite(final ByteBuffer byteBuffer) {
        return EXECUTOR.submit(new Callable<WriteSummary>() { // from class: alluxio.client.block.stream.UfsFallbackLocalFileDataWriterTest.3
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.util.concurrent.Callable
            public WriteSummary call() throws TimeoutException, InterruptedException {
                long j = 0;
                long j2 = 0;
                CommonUtils.waitFor("Writing to local completes", () -> {
                    return Boolean.valueOf(UfsFallbackLocalFileDataWriterTest.this.mLocalWriter.isClosed());
                });
                synchronized (byteBuffer) {
                    int position = byteBuffer.position();
                    while (j2 < position) {
                        j += BufferUtils.byteToInt(byteBuffer.get((int) j2));
                        j2 += UfsFallbackLocalFileDataWriterTest.BLOCK_ID;
                    }
                }
                return new WriteSummary(j2, j);
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void validateWriteRequest(WriteRequest writeRequest, long j) {
        if (!writeRequest.hasCommand()) {
            Assert.assertTrue(writeRequest.hasChunk());
            return;
        }
        Assert.assertEquals(RequestType.UFS_FALLBACK_BLOCK, writeRequest.getCommand().getType());
        Assert.assertEquals(BLOCK_ID, writeRequest.getCommand().getId());
        Assert.assertEquals(j, writeRequest.getCommand().getOffset());
        Assert.assertTrue(writeRequest.getCommand().hasCreateUfsBlockOptions());
        Assert.assertEquals(MOUNT_ID, writeRequest.getCommand().getCreateUfsBlockOptions().getMountId());
    }
}
