package alluxio.worker.netty;

import alluxio.Configuration;
import alluxio.StorageTierAssoc;
import alluxio.WorkerStorageTierAssoc;
import alluxio.exception.BlockDoesNotExistException;
import alluxio.exception.InvalidWorkerStateException;
import alluxio.network.protocol.RPCBlockReadRequest;
import alluxio.network.protocol.RPCBlockReadResponse;
import alluxio.network.protocol.RPCBlockWriteRequest;
import alluxio.network.protocol.RPCBlockWriteResponse;
import alluxio.network.protocol.RPCResponse;
import alluxio.network.protocol.databuffer.DataBuffer;
import alluxio.network.protocol.databuffer.DataByteBuffer;
import alluxio.network.protocol.databuffer.DataFileChannel;
import alluxio.worker.block.BlockWorker;
import alluxio.worker.block.io.BlockReader;
import alluxio.worker.block.io.BlockWriter;
import com.google.common.base.Preconditions;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import javax.annotation.concurrent.NotThreadSafe;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@NotThreadSafe
/* loaded from: input_file:alluxio/worker/netty/BlockDataServerHandler.class */
public final class BlockDataServerHandler {
    private static final Logger LOG = LoggerFactory.getLogger("alluxio.logger.type");
    private final BlockWorker mWorker;
    private final StorageTierAssoc mStorageTierAssoc = new WorkerStorageTierAssoc();
    private final FileTransferType mTransferType = (FileTransferType) Configuration.getEnum("alluxio.worker.network.netty.file.transfer", FileTransferType.class);

    /* JADX INFO: Access modifiers changed from: package-private */
    public BlockDataServerHandler(BlockWorker blockWorker) {
        this.mWorker = blockWorker;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void handleBlockReadRequest(ChannelHandlerContext channelHandlerContext, RPCBlockReadRequest rPCBlockReadRequest) throws IOException {
        long blockId = rPCBlockReadRequest.getBlockId();
        long offset = rPCBlockReadRequest.getOffset();
        long length = rPCBlockReadRequest.getLength();
        long lockId = rPCBlockReadRequest.getLockId();
        long sessionId = rPCBlockReadRequest.getSessionId();
        try {
            BlockReader readBlockRemote = this.mWorker.readBlockRemote(sessionId, blockId, lockId);
            try {
                rPCBlockReadRequest.validate();
                long length2 = readBlockRemote.getLength();
                validateBounds(rPCBlockReadRequest, length2);
                long returnLength = returnLength(offset, length, length2);
                DataBuffer dataBuffer = getDataBuffer(rPCBlockReadRequest, readBlockRemote, returnLength);
                ChannelFuture writeAndFlush = channelHandlerContext.writeAndFlush(new RPCBlockReadResponse(blockId, offset, returnLength, dataBuffer, RPCResponse.Status.SUCCESS));
                writeAndFlush.addListener(ChannelFutureListener.CLOSE);
                writeAndFlush.addListener(new ClosableResourceChannelListener(readBlockRemote));
                writeAndFlush.addListener(new ReleasableResourceChannelListener(dataBuffer));
                this.mWorker.accessBlock(sessionId, blockId);
                LOG.info("Preparation for responding to remote block request for: {} done.", Long.valueOf(blockId));
            } catch (Exception e) {
                LOG.error("The file is not here : {}", e.getMessage(), e);
                channelHandlerContext.writeAndFlush(RPCBlockReadResponse.createErrorResponse(rPCBlockReadRequest, RPCResponse.Status.FILE_DNE)).addListener(ChannelFutureListener.CLOSE);
                if (readBlockRemote != null) {
                    readBlockRemote.close();
                }
            }
        } catch (BlockDoesNotExistException | InvalidWorkerStateException e2) {
            throw new IOException((Throwable) e2);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void handleBlockWriteRequest(ChannelHandlerContext channelHandlerContext, RPCBlockWriteRequest rPCBlockWriteRequest) throws IOException {
        long sessionId = rPCBlockWriteRequest.getSessionId();
        long blockId = rPCBlockWriteRequest.getBlockId();
        long offset = rPCBlockWriteRequest.getOffset();
        long length = rPCBlockWriteRequest.getLength();
        DataBuffer payloadDataBuffer = rPCBlockWriteRequest.getPayloadDataBuffer();
        BlockWriter blockWriter = null;
        try {
            rPCBlockWriteRequest.validate();
            ByteBuffer readOnlyByteBuffer = payloadDataBuffer.getReadOnlyByteBuffer();
            if (offset == 0) {
                this.mWorker.createBlockRemote(sessionId, blockId, this.mStorageTierAssoc.getAlias(0), length);
            } else {
                this.mWorker.requestSpace(sessionId, blockId, length);
            }
            blockWriter = this.mWorker.getTempBlockWriterRemote(sessionId, blockId);
            blockWriter.append(readOnlyByteBuffer);
            ChannelFuture writeAndFlush = channelHandlerContext.writeAndFlush(new RPCBlockWriteResponse(sessionId, blockId, offset, length, RPCResponse.Status.SUCCESS));
            writeAndFlush.addListener(ChannelFutureListener.CLOSE);
            writeAndFlush.addListener(new ClosableResourceChannelListener(blockWriter));
        } catch (Exception e) {
            LOG.error("Error writing remote block : {}", e.getMessage(), e);
            channelHandlerContext.writeAndFlush(RPCBlockWriteResponse.createErrorResponse(rPCBlockWriteRequest, RPCResponse.Status.WRITE_ERROR)).addListener(ChannelFutureListener.CLOSE);
            if (blockWriter != null) {
                blockWriter.close();
            }
        }
    }

    private long returnLength(long j, long j2, long j3) {
        return j2 == -1 ? j3 - j : j2;
    }

    private void validateBounds(RPCBlockReadRequest rPCBlockReadRequest, long j) {
        Preconditions.checkArgument(rPCBlockReadRequest.getOffset() <= j, "Offset(%s) is larger than file length(%s)", new Object[]{Long.valueOf(rPCBlockReadRequest.getOffset()), Long.valueOf(j)});
        Preconditions.checkArgument(rPCBlockReadRequest.getLength() == -1 || rPCBlockReadRequest.getOffset() + rPCBlockReadRequest.getLength() <= j, "Offset(%s) plus length(%s) is larger than file length(%s)", new Object[]{Long.valueOf(rPCBlockReadRequest.getOffset()), Long.valueOf(rPCBlockReadRequest.getLength()), Long.valueOf(j)});
    }

    private DataBuffer getDataBuffer(RPCBlockReadRequest rPCBlockReadRequest, BlockReader blockReader, long j) throws IOException, IllegalArgumentException {
        switch (this.mTransferType) {
            case MAPPED:
                return new DataByteBuffer(blockReader.read(rPCBlockReadRequest.getOffset(), (int) j), j);
            case TRANSFER:
            default:
                if (blockReader.getChannel() instanceof FileChannel) {
                    return new DataFileChannel((FileChannel) blockReader.getChannel(), rPCBlockReadRequest.getOffset(), j);
                }
                blockReader.close();
                throw new IllegalArgumentException("Only FileChannel is supported!");
        }
    }
}
