package alluxio.client.block.stream;

import alluxio.Configuration;
import alluxio.PropertyKey;
import alluxio.client.block.stream.PacketReader;
import alluxio.client.file.FileSystemContext;
import alluxio.client.file.options.InStreamOptions;
import alluxio.exception.status.AlluxioStatusException;
import alluxio.exception.status.CanceledException;
import alluxio.exception.status.DeadlineExceededException;
import alluxio.network.protocol.RPCProtoMessage;
import alluxio.network.protocol.databuffer.DataBuffer;
import alluxio.network.protocol.databuffer.DataNettyBufferV2;
import alluxio.proto.dataserver.Protocol;
import alluxio.proto.status.Status;
import alluxio.util.CommonUtils;
import alluxio.util.network.NettyUtils;
import alluxio.util.proto.ProtoMessage;
import alluxio.wire.WorkerNetAddress;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import java.io.IOException;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import javax.annotation.concurrent.NotThreadSafe;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@NotThreadSafe
/* loaded from: input_file:alluxio/client/block/stream/NettyPacketReader.class */
public final class NettyPacketReader implements PacketReader {
    private static final Logger LOG = LoggerFactory.getLogger(NettyPacketReader.class);
    private static final int MAX_PACKETS_IN_FLIGHT = Configuration.getInt(PropertyKey.USER_NETWORK_NETTY_READER_BUFFER_SIZE_PACKETS);
    private static final long READ_TIMEOUT_MS = Configuration.getMs(PropertyKey.USER_NETWORK_NETTY_TIMEOUT_MS);
    private static final ByteBuf THROWABLE = Unpooled.buffer(0);
    private static final ByteBuf UFS_READ_HEARTBEAT = Unpooled.buffer(0);
    private static final ByteBuf EOF_OR_CANCELLED = Unpooled.buffer(0);
    private final FileSystemContext mContext;
    private final Channel mChannel;
    private final Protocol.ReadRequest mReadRequest;
    private final WorkerNetAddress mAddress;
    private final BlockingQueue<ByteBuf> mPackets;
    private volatile Throwable mPacketReaderException;
    private long mPosToRead;
    private boolean mDone;
    private boolean mClosed;

    /* loaded from: input_file:alluxio/client/block/stream/NettyPacketReader$Factory.class */
    public static class Factory implements PacketReader.Factory {
        private final FileSystemContext mContext;
        private final WorkerNetAddress mAddress;
        private final Protocol.ReadRequest mReadRequestPartial;
        private final InStreamOptions mOptions;

        public Factory(FileSystemContext fileSystemContext, WorkerNetAddress workerNetAddress, Protocol.ReadRequest readRequest, InStreamOptions inStreamOptions) {
            this.mContext = fileSystemContext;
            this.mAddress = workerNetAddress;
            this.mReadRequestPartial = readRequest;
            this.mOptions = inStreamOptions;
        }

        @Override // alluxio.client.block.stream.PacketReader.Factory
        public PacketReader create(long j, long j2) throws IOException {
            return new NettyPacketReader(this.mContext, this.mAddress, this.mReadRequestPartial.toBuilder().setOffset(j).setLength(j2).build(), this.mOptions);
        }

        @Override // alluxio.client.block.stream.PacketReader.Factory
        public boolean isShortCircuit() {
            return false;
        }

        @Override // java.io.Closeable, java.lang.AutoCloseable
        public void close() throws IOException {
        }
    }

    /* loaded from: input_file:alluxio/client/block/stream/NettyPacketReader$PacketReadHandler.class */
    private class PacketReadHandler extends ChannelInboundHandlerAdapter {
        PacketReadHandler() {
        }

        public void channelRead(ChannelHandlerContext channelHandlerContext, Object obj) throws IOException {
            ByteBuf byteBuf;
            if (!(obj instanceof RPCProtoMessage)) {
                throw new IllegalStateException(String.format("Incorrect response type %s, %s.", obj.getClass().getCanonicalName(), obj));
            }
            RPCProtoMessage rPCProtoMessage = (RPCProtoMessage) obj;
            ProtoMessage message = rPCProtoMessage.getMessage();
            if (message.isReadResponse()) {
                Preconditions.checkState(message.asReadResponse().getType() == Protocol.ReadResponse.Type.UFS_READ_HEARTBEAT);
                byteBuf = NettyPacketReader.UFS_READ_HEARTBEAT;
            } else {
                if (!message.isResponse()) {
                    throw new IllegalStateException(String.format("Incorrect response type %s.", message.toString()));
                }
                if (message.asResponse().getStatus() != Status.PStatus.CANCELED) {
                    CommonUtils.unwrapResponseFrom(rPCProtoMessage.getMessage().asResponse(), channelHandlerContext.channel());
                }
                DataBuffer payloadDataBuffer = rPCProtoMessage.getPayloadDataBuffer();
                if (payloadDataBuffer == null) {
                    byteBuf = NettyPacketReader.EOF_OR_CANCELLED;
                } else {
                    Preconditions.checkState(payloadDataBuffer.getNettyOutput() instanceof ByteBuf);
                    byteBuf = (ByteBuf) payloadDataBuffer.getNettyOutput();
                }
            }
            if (NettyPacketReader.this.tooManyPacketsPending()) {
                NettyUtils.disableAutoRead(channelHandlerContext.channel());
            }
            NettyPacketReader.this.mPackets.offer(byteBuf);
        }

        public void exceptionCaught(ChannelHandlerContext channelHandlerContext, Throwable th) {
            NettyPacketReader.LOG.error("Exception caught while reading from {}.", Long.valueOf(NettyPacketReader.this.mReadRequest.getBlockId()), th);
            if (NettyPacketReader.this.mPacketReaderException == null) {
                NettyPacketReader.this.mPacketReaderException = th;
                NettyPacketReader.this.mPackets.offer(NettyPacketReader.THROWABLE);
            }
            channelHandlerContext.close();
        }

        public void channelUnregistered(ChannelHandlerContext channelHandlerContext) {
            NettyPacketReader.LOG.warn("Channel {} is closed while reading from {}.", NettyPacketReader.this.mChannel, Long.valueOf(NettyPacketReader.this.mReadRequest.getBlockId()));
            if (NettyPacketReader.this.mPacketReaderException == null) {
                NettyPacketReader.this.mPacketReaderException = new IOException(String.format("Channel %s is closed.", NettyPacketReader.this.mChannel.toString()));
                NettyPacketReader.this.mPackets.offer(NettyPacketReader.THROWABLE);
            }
            channelHandlerContext.fireChannelUnregistered();
        }
    }

    private NettyPacketReader(FileSystemContext fileSystemContext, WorkerNetAddress workerNetAddress, Protocol.ReadRequest readRequest, InStreamOptions inStreamOptions) throws IOException {
        this.mPackets = new LinkedBlockingQueue();
        this.mDone = false;
        this.mClosed = false;
        this.mContext = fileSystemContext;
        this.mAddress = workerNetAddress;
        this.mPosToRead = readRequest.getOffset();
        this.mReadRequest = readRequest;
        this.mChannel = this.mContext.acquireNettyChannel(workerNetAddress);
        this.mChannel.pipeline().addLast(new ChannelHandler[]{new PacketReadHandler()});
        this.mChannel.writeAndFlush(new RPCProtoMessage(new ProtoMessage(this.mReadRequest))).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
    }

    @Override // alluxio.client.block.stream.PacketReader
    public long pos() {
        return this.mPosToRead;
    }

    @Override // alluxio.client.block.stream.PacketReader
    public DataBuffer readPacket() throws IOException {
        ByteBuf poll;
        Preconditions.checkState(!this.mClosed, "PacketReader is closed while reading packets.");
        if (!tooManyPacketsPending()) {
            NettyUtils.enableAutoRead(this.mChannel);
        }
        do {
            try {
                poll = this.mPackets.poll(READ_TIMEOUT_MS, TimeUnit.MILLISECONDS);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new CanceledException(e);
            }
        } while (poll == UFS_READ_HEARTBEAT);
        if (poll == null) {
            throw new DeadlineExceededException(String.format("Timeout to read %d from %s.", Long.valueOf(this.mReadRequest.getBlockId()), this.mChannel.toString()));
        }
        if (poll == THROWABLE) {
            Preconditions.checkNotNull(this.mPacketReaderException, "mPacketReaderException");
            Throwables.propagateIfPossible(this.mPacketReaderException, IOException.class);
            throw AlluxioStatusException.fromCheckedException(this.mPacketReaderException);
        }
        if (poll == EOF_OR_CANCELLED) {
            this.mDone = true;
            return null;
        }
        this.mPosToRead += poll.readableBytes();
        Preconditions.checkState(this.mPosToRead - this.mReadRequest.getOffset() <= this.mReadRequest.getLength());
        return new DataNettyBufferV2(poll);
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        if (this.mClosed) {
            return;
        }
        try {
            if (this.mDone) {
                return;
            }
            if (!this.mChannel.isOpen()) {
                if (this.mChannel.isOpen()) {
                    this.mChannel.pipeline().removeLast();
                    NettyUtils.enableAutoRead(this.mChannel);
                }
                this.mContext.releaseNettyChannel(this.mAddress, this.mChannel);
                this.mClosed = true;
                return;
            }
            if (remaining() > 0) {
                this.mChannel.writeAndFlush(new RPCProtoMessage(new ProtoMessage(this.mReadRequest.toBuilder().setCancel(true).build()))).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
            }
            try {
                readAndDiscardAll();
                if (this.mChannel.isOpen()) {
                    this.mChannel.pipeline().removeLast();
                    NettyUtils.enableAutoRead(this.mChannel);
                }
                this.mContext.releaseNettyChannel(this.mAddress, this.mChannel);
                this.mClosed = true;
            } catch (IOException e) {
                LOG.warn("Failed to close the NettyBlockReader (block: {}, address: {}) with exception {}.", new Object[]{Long.valueOf(this.mReadRequest.getBlockId()), this.mAddress, e.getMessage()});
                CommonUtils.closeChannel(this.mChannel);
                if (this.mChannel.isOpen()) {
                    this.mChannel.pipeline().removeLast();
                    NettyUtils.enableAutoRead(this.mChannel);
                }
                this.mContext.releaseNettyChannel(this.mAddress, this.mChannel);
                this.mClosed = true;
            }
        } finally {
            if (this.mChannel.isOpen()) {
                this.mChannel.pipeline().removeLast();
                NettyUtils.enableAutoRead(this.mChannel);
            }
            this.mContext.releaseNettyChannel(this.mAddress, this.mChannel);
            this.mClosed = true;
        }
    }

    private void readAndDiscardAll() throws IOException {
        DataBuffer readPacket;
        do {
            readPacket = readPacket();
            if (readPacket != null) {
                readPacket.release();
            }
        } while (readPacket != null);
    }

    private long remaining() {
        return (this.mReadRequest.getOffset() + this.mReadRequest.getLength()) - this.mPosToRead;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public boolean tooManyPacketsPending() {
        return this.mPackets.size() >= MAX_PACKETS_IN_FLIGHT;
    }
}
