package alluxio.worker.netty;

import alluxio.ConfigurationRule;
import alluxio.PropertyKey;
import alluxio.client.netty.ClientHandler;
import alluxio.client.netty.NettyClient;
import alluxio.client.netty.SingleResponseListener;
import alluxio.exception.BlockDoesNotExistException;
import alluxio.network.protocol.RPCBlockReadRequest;
import alluxio.network.protocol.RPCBlockWriteRequest;
import alluxio.network.protocol.RPCFileReadRequest;
import alluxio.network.protocol.RPCFileWriteRequest;
import alluxio.network.protocol.RPCRequest;
import alluxio.network.protocol.RPCResponse;
import alluxio.network.protocol.databuffer.DataByteArrayChannel;
import alluxio.worker.AlluxioWorkerService;
import alluxio.worker.block.BlockWorker;
import alluxio.worker.block.io.MockBlockReader;
import alluxio.worker.block.io.MockBlockWriter;
import alluxio.worker.file.FileSystemWorker;
import com.google.common.base.Charsets;
import com.google.common.collect.ImmutableMap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.net.InetSocketAddress;
import java.util.concurrent.TimeUnit;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.mockito.Matchers;
import org.mockito.Mockito;

/* loaded from: input_file:alluxio/worker/netty/NettyDataServerTest.class */
public final class NettyDataServerTest {
    private NettyDataServer mNettyDataServer;
    private BlockWorker mBlockWorker;
    private FileSystemWorker mFileSystemWorker;

    @Rule
    public ConfigurationRule mRule = new ConfigurationRule(ImmutableMap.of(PropertyKey.WORKER_NETWORK_NETTY_SHUTDOWN_QUIET_PERIOD, "0"));

    @Before
    public void before() {
        this.mBlockWorker = (BlockWorker) Mockito.mock(BlockWorker.class);
        this.mFileSystemWorker = (FileSystemWorker) Mockito.mock(FileSystemWorker.class);
        AlluxioWorkerService alluxioWorkerService = (AlluxioWorkerService) Mockito.mock(AlluxioWorkerService.class);
        Mockito.when(alluxioWorkerService.getBlockWorker()).thenReturn(this.mBlockWorker);
        Mockito.when(alluxioWorkerService.getFileSystemWorker()).thenReturn(this.mFileSystemWorker);
        this.mNettyDataServer = new NettyDataServer(new InetSocketAddress(0), alluxioWorkerService);
    }

    @After
    public void after() throws Exception {
        this.mNettyDataServer.close();
    }

    @Test
    public void close() throws Exception {
        this.mNettyDataServer.close();
    }

    @Test
    public void port() {
        Assert.assertTrue(this.mNettyDataServer.getPort() > 0);
    }

    @Test
    public void readBlock() throws Exception {
        Mockito.when(this.mBlockWorker.readBlockRemote(0L, 1L, 4L)).thenReturn(new MockBlockReader("abcdefg".getBytes(Charsets.UTF_8)));
        Assert.assertEquals("cde", Charsets.UTF_8.decode(request(new RPCBlockReadRequest(1L, 2L, 3L, 4L, 0L)).getPayloadDataBuffer().getReadOnlyByteBuffer()).toString());
    }

    @Test
    public void blockWorkerExceptionCausesReadFailedStatus() throws Exception {
        Mockito.when(this.mBlockWorker.readBlockRemote(Matchers.anyLong(), Matchers.anyLong(), Matchers.anyLong())).thenThrow(new Throwable[]{new RuntimeException()});
        Assert.assertEquals(RPCResponse.Status.UFS_READ_FAILED, request(new RPCBlockReadRequest(1L, 2L, 3L, 4L, 0L)).getStatus());
    }

    @Test
    public void blockWorkerBlockDoesNotExistExceptionCausesFileDneStatus() throws Exception {
        Mockito.when(this.mBlockWorker.readBlockRemote(Matchers.anyLong(), Matchers.anyLong(), Matchers.anyLong())).thenThrow(new Throwable[]{new BlockDoesNotExistException("")});
        Assert.assertEquals(RPCResponse.Status.FILE_DNE, request(new RPCBlockReadRequest(1L, 2L, 3L, 4L, 0L)).getStatus());
    }

    @Test
    public void blockWorkerExceptionCausesFailStatusOnRead() throws Exception {
        Mockito.when(this.mBlockWorker.readBlockRemote(Matchers.anyLong(), Matchers.anyLong(), Matchers.anyLong())).thenThrow(new Throwable[]{new RuntimeException()});
        Assert.assertEquals(RPCResponse.Status.UFS_READ_FAILED, request(new RPCBlockReadRequest(1L, 2L, 3L, 4L, 0L)).getStatus());
    }

    @Test
    public void writeNewBlock() throws Exception {
        DataByteArrayChannel dataByteArrayChannel = new DataByteArrayChannel("abc".getBytes(Charsets.UTF_8), 0L, 3L);
        MockBlockWriter mockBlockWriter = new MockBlockWriter();
        Mockito.when(this.mBlockWorker.getTempBlockWriterRemote(0L, 1L)).thenReturn(mockBlockWriter);
        Assert.assertEquals(RPCResponse.Status.SUCCESS, request(new RPCBlockWriteRequest(0L, 1L, 0L, 2L, dataByteArrayChannel)).getStatus());
        ((BlockWorker) Mockito.verify(this.mBlockWorker)).createBlockRemote(0L, 1L, "MEM", 2L);
        Assert.assertEquals("ab", new String(mockBlockWriter.getBytes(), Charsets.UTF_8));
    }

    @Test
    public void writeExistingBlock() throws Exception {
        DataByteArrayChannel dataByteArrayChannel = new DataByteArrayChannel("abc".getBytes(Charsets.UTF_8), 0L, 3L);
        MockBlockWriter mockBlockWriter = new MockBlockWriter();
        Mockito.when(this.mBlockWorker.getTempBlockWriterRemote(0L, 1L)).thenReturn(mockBlockWriter);
        Assert.assertEquals(RPCResponse.Status.SUCCESS, request(new RPCBlockWriteRequest(0L, 1L, 1L, 2L, dataByteArrayChannel)).getStatus());
        ((BlockWorker) Mockito.verify(this.mBlockWorker)).requestSpace(0L, 1L, 2L);
        Assert.assertEquals("ab", new String(mockBlockWriter.getBytes(), Charsets.UTF_8));
    }

    @Test
    public void blockWorkerExceptionCausesFailStatusOnWrite() throws Exception {
        Mockito.when(this.mBlockWorker.getTempBlockWriterRemote(0L, 1L)).thenThrow(new Throwable[]{new RuntimeException()});
        Assert.assertEquals(RPCResponse.Status.WRITE_ERROR, request(new RPCBlockWriteRequest(0L, 1L, 0L, 2L, new DataByteArrayChannel("abc".getBytes(Charsets.UTF_8), 0L, 3L))).getStatus());
    }

    @Test
    public void readFile() throws Exception {
        Mockito.when(this.mFileSystemWorker.getUfsInputStream(1L, 0L)).thenReturn(new ByteArrayInputStream("abc".getBytes(Charsets.UTF_8)));
        Assert.assertEquals("abc", Charsets.UTF_8.decode(request(new RPCFileReadRequest(1L, 0L, 3L)).getPayloadDataBuffer().getReadOnlyByteBuffer()).toString());
    }

    @Test
    public void fileSystemWorkerExceptionCausesFailStatusOnRead() throws Exception {
        Mockito.when(this.mFileSystemWorker.getUfsInputStream(1L, 0L)).thenThrow(new Throwable[]{new RuntimeException()});
        Assert.assertEquals(RPCResponse.Status.UFS_READ_FAILED, request(new RPCFileReadRequest(1L, 0L, 3L)).getStatus());
    }

    @Test
    public void writeFile() throws Exception {
        DataByteArrayChannel dataByteArrayChannel = new DataByteArrayChannel("abc".getBytes(Charsets.UTF_8), 0L, 3L);
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        Mockito.when(this.mFileSystemWorker.getUfsOutputStream(1L)).thenReturn(byteArrayOutputStream);
        Assert.assertEquals(RPCResponse.Status.SUCCESS, request(new RPCFileWriteRequest(1L, 0L, 3L, dataByteArrayChannel)).getStatus());
        Assert.assertEquals("abc", new String(byteArrayOutputStream.toByteArray(), Charsets.UTF_8));
    }

    @Test
    public void fileSystemWorkerExceptionCausesFailStatusOnWrite() throws Exception {
        DataByteArrayChannel dataByteArrayChannel = new DataByteArrayChannel("abc".getBytes(Charsets.UTF_8), 0L, 3L);
        Mockito.when(this.mFileSystemWorker.getUfsOutputStream(1L)).thenThrow(new Throwable[]{new RuntimeException()});
        Assert.assertEquals(RPCResponse.Status.UFS_WRITE_FAILED, request(new RPCFileWriteRequest(1L, 0L, 3L, dataByteArrayChannel)).getStatus());
    }

    private RPCResponse request(RPCRequest rPCRequest) throws Exception {
        Channel channel = NettyClient.createClientBootstrap().connect(new InetSocketAddress(this.mNettyDataServer.getBindHost(), this.mNettyDataServer.getPort())).sync().channel();
        try {
            SingleResponseListener singleResponseListener = new SingleResponseListener();
            channel.pipeline().addLast(new ChannelHandler[]{new ClientHandler()}).last().addListener(singleResponseListener);
            channel.writeAndFlush(rPCRequest);
            RPCResponse rPCResponse = singleResponseListener.get(NettyClient.TIMEOUT_MS, TimeUnit.MILLISECONDS);
            channel.close().sync();
            return rPCResponse;
        } catch (Throwable th) {
            channel.close().sync();
            throw th;
        }
    }
}
