package alluxio.worker.netty;

import alluxio.network.protocol.RPCFileReadRequest;
import alluxio.network.protocol.RPCFileReadResponse;
import alluxio.network.protocol.RPCFileWriteRequest;
import alluxio.network.protocol.RPCFileWriteResponse;
import alluxio.network.protocol.RPCResponse;
import alluxio.network.protocol.databuffer.DataByteBuffer;
import alluxio.worker.file.FileSystemWorker;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import javax.annotation.concurrent.NotThreadSafe;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@NotThreadSafe
/* loaded from: input_file:alluxio/worker/netty/UnderFileSystemDataServerHandler.class */
public class UnderFileSystemDataServerHandler {
    private static final Logger LOG = LoggerFactory.getLogger("alluxio.logger.type");
    private final FileSystemWorker mWorker;

    public UnderFileSystemDataServerHandler(FileSystemWorker fileSystemWorker) {
        this.mWorker = fileSystemWorker;
    }

    public void handleFileReadRequest(ChannelHandlerContext channelHandlerContext, RPCFileReadRequest rPCFileReadRequest) throws IOException {
        int read;
        rPCFileReadRequest.validate();
        long tempUfsFileId = rPCFileReadRequest.getTempUfsFileId();
        long offset = rPCFileReadRequest.getOffset();
        long length = rPCFileReadRequest.getLength();
        byte[] bArr = new byte[(int) length];
        try {
            InputStream ufsInputStream = this.mWorker.getUfsInputStream(tempUfsFileId, offset);
            int i = 0;
            if (ufsInputStream != null) {
                while (i < length && (read = ufsInputStream.read(bArr, i, ((int) length) - i)) != -1) {
                    i += read;
                }
            }
            channelHandlerContext.writeAndFlush(new RPCFileReadResponse(tempUfsFileId, offset, i, i != 0 ? new DataByteBuffer(ByteBuffer.wrap(bArr, 0, i), i) : null, RPCResponse.Status.SUCCESS)).addListener(ChannelFutureListener.CLOSE);
        } catch (Exception e) {
            LOG.error("Failed to read ufs file, may have been closed due to a client timeout.", e);
            channelHandlerContext.writeAndFlush(RPCFileReadResponse.createErrorResponse(rPCFileReadRequest, RPCResponse.Status.UFS_READ_FAILED)).addListener(ChannelFutureListener.CLOSE);
        }
    }

    public void handleFileWriteRequest(ChannelHandlerContext channelHandlerContext, RPCFileWriteRequest rPCFileWriteRequest) throws IOException {
        long tempUfsFileId = rPCFileWriteRequest.getTempUfsFileId();
        long offset = rPCFileWriteRequest.getOffset();
        long length = rPCFileWriteRequest.getLength();
        try {
            Channels.newChannel(this.mWorker.getUfsOutputStream(tempUfsFileId)).write(rPCFileWriteRequest.getPayloadDataBuffer().getReadOnlyByteBuffer());
            channelHandlerContext.writeAndFlush(new RPCFileWriteResponse(tempUfsFileId, offset, length, RPCResponse.Status.SUCCESS)).addListener(ChannelFutureListener.CLOSE);
        } catch (Exception e) {
            LOG.error("Failed to write ufs file.", e);
            channelHandlerContext.writeAndFlush(RPCFileWriteResponse.createErrorResponse(rPCFileWriteRequest, RPCResponse.Status.UFS_WRITE_FAILED)).addListener(ChannelFutureListener.CLOSE);
        }
    }
}
