package alluxio.client.block.stream;

import alluxio.Seekable;
import alluxio.client.BoundedStream;
import alluxio.client.PositionedReadable;
import alluxio.client.ReadType;
import alluxio.client.block.stream.DataReader;
import alluxio.client.block.stream.GrpcDataReader;
import alluxio.client.block.stream.LocalFileDataReader;
import alluxio.client.file.FileSystemContext;
import alluxio.client.file.options.InStreamOptions;
import alluxio.conf.AlluxioConfiguration;
import alluxio.conf.PropertyKey;
import alluxio.exception.PreconditionMessage;
import alluxio.exception.status.NotFoundException;
import alluxio.grpc.ReadRequest;
import alluxio.network.protocol.databuffer.DataBuffer;
import alluxio.proto.dataserver.Protocol;
import alluxio.util.io.BufferUtils;
import alluxio.util.network.NettyUtils;
import alluxio.util.network.NetworkAddressUtils;
import alluxio.wire.BlockInfo;
import alluxio.wire.WorkerNetAddress;
import com.google.common.base.Preconditions;
import java.io.IOException;
import java.io.InputStream;
import javax.annotation.concurrent.NotThreadSafe;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@NotThreadSafe
/* loaded from: input_file:alluxio/client/block/stream/BlockInStream.class */
public class BlockInStream extends InputStream implements BoundedStream, Seekable, PositionedReadable {
    private static final Logger LOG = LoggerFactory.getLogger(BlockInStream.class);
    private final WorkerNetAddress mAddress;
    private final BlockInStreamSource mInStreamSource;
    private final long mId;
    private final long mLength;
    private DataBuffer mCurrentChunk;
    private DataReader mDataReader;
    private final DataReader.Factory mDataReaderFactory;
    private final byte[] mSingleByte = new byte[1];
    private long mPos = 0;
    private boolean mClosed = false;
    private boolean mEOF = false;

    /* loaded from: input_file:alluxio/client/block/stream/BlockInStream$BlockInStreamSource.class */
    public enum BlockInStreamSource {
        LOCAL,
        REMOTE,
        UFS
    }

    public static BlockInStream create(FileSystemContext fileSystemContext, BlockInfo blockInfo, WorkerNetAddress workerNetAddress, BlockInStreamSource blockInStreamSource, InStreamOptions inStreamOptions) throws IOException {
        inStreamOptions.getStatus();
        ReadType fromProto = ReadType.fromProto(inStreamOptions.getOptions().getReadType());
        long blockId = blockInfo.getBlockId();
        long length = blockInfo.getLength();
        ReadRequest.Builder promote = ReadRequest.newBuilder().setBlockId(blockId).setPromote(fromProto.isPromote());
        promote.setOpenUfsBlockOptions(inStreamOptions.getOpenUfsBlockOptions(blockId));
        AlluxioConfiguration clusterConf = fileSystemContext.getClusterConf();
        boolean z = clusterConf.getBoolean(PropertyKey.USER_SHORT_CIRCUIT_ENABLED);
        boolean isDomainSocketSupported = NettyUtils.isDomainSocketSupported(workerNetAddress);
        if ((blockInStreamSource == BlockInStreamSource.LOCAL) && z && !isDomainSocketSupported) {
            LOG.debug("Creating short circuit input stream for block {} @ {}", Long.valueOf(blockId), workerNetAddress);
            try {
                return createLocalBlockInStream(fileSystemContext, workerNetAddress, blockId, length, inStreamOptions);
            } catch (NotFoundException e) {
                LOG.warn("Failed to create short circuit input stream for block {} @ {}. Falling back to network transfer", Long.valueOf(blockId), workerNetAddress);
            }
        }
        LOG.debug("Creating gRPC input stream for block {} @ {} from client {} reading through {}", new Object[]{Long.valueOf(blockId), workerNetAddress, NetworkAddressUtils.getClientHostName(clusterConf), workerNetAddress});
        return createGrpcBlockInStream(fileSystemContext, workerNetAddress, blockInStreamSource, promote.buildPartial(), length, inStreamOptions);
    }

    private static BlockInStream createLocalBlockInStream(FileSystemContext fileSystemContext, WorkerNetAddress workerNetAddress, long j, long j2, InStreamOptions inStreamOptions) throws IOException {
        return new BlockInStream(new LocalFileDataReader.Factory(fileSystemContext, workerNetAddress, j, fileSystemContext.getClusterConf().getBytes(PropertyKey.USER_LOCAL_READER_CHUNK_SIZE_BYTES), inStreamOptions), workerNetAddress, BlockInStreamSource.LOCAL, j, j2);
    }

    private static BlockInStream createGrpcBlockInStream(FileSystemContext fileSystemContext, WorkerNetAddress workerNetAddress, BlockInStreamSource blockInStreamSource, ReadRequest readRequest, long j, InStreamOptions inStreamOptions) {
        ReadRequest.Builder builder = readRequest.toBuilder();
        builder.setChunkSize(fileSystemContext.getClusterConf().getBytes(PropertyKey.USER_NETWORK_READER_CHUNK_SIZE_BYTES));
        return new BlockInStream(new GrpcDataReader.Factory(fileSystemContext, workerNetAddress, builder.build()), workerNetAddress, blockInStreamSource, readRequest.getBlockId(), j);
    }

    public static BlockInStream createRemoteBlockInStream(FileSystemContext fileSystemContext, long j, WorkerNetAddress workerNetAddress, BlockInStreamSource blockInStreamSource, long j2, Protocol.OpenUfsBlockOptions openUfsBlockOptions) {
        return new BlockInStream(new GrpcDataReader.Factory(fileSystemContext, workerNetAddress, ReadRequest.newBuilder().setBlockId(j).setOpenUfsBlockOptions(openUfsBlockOptions).setChunkSize(fileSystemContext.getClusterConf().getBytes(PropertyKey.USER_NETWORK_READER_CHUNK_SIZE_BYTES)).buildPartial().toBuilder().buildPartial()), workerNetAddress, blockInStreamSource, j, j2);
    }

    protected BlockInStream(DataReader.Factory factory, WorkerNetAddress workerNetAddress, BlockInStreamSource blockInStreamSource, long j, long j2) {
        this.mDataReaderFactory = factory;
        this.mAddress = workerNetAddress;
        this.mInStreamSource = blockInStreamSource;
        this.mId = j;
        this.mLength = j2;
    }

    public long getPos() {
        return this.mPos;
    }

    @Override // java.io.InputStream
    public int read() throws IOException {
        int read = read(this.mSingleByte);
        if (read == -1) {
            return -1;
        }
        Preconditions.checkState(read == 1);
        return BufferUtils.byteToInt(this.mSingleByte[0]);
    }

    @Override // java.io.InputStream
    public int read(byte[] bArr) throws IOException {
        return read(bArr, 0, bArr.length);
    }

    @Override // java.io.InputStream
    public int read(byte[] bArr, int i, int i2) throws IOException {
        checkIfClosed();
        Preconditions.checkArgument(bArr != null, PreconditionMessage.ERR_READ_BUFFER_NULL);
        Preconditions.checkArgument(i >= 0 && i2 >= 0 && i2 + i <= bArr.length, PreconditionMessage.ERR_BUFFER_STATE.toString(), Integer.valueOf(bArr.length), Integer.valueOf(i), Integer.valueOf(i2));
        if (i2 == 0) {
            return 0;
        }
        readChunk();
        if (this.mCurrentChunk == null) {
            this.mEOF = true;
        }
        if (this.mEOF) {
            closeDataReader();
            Preconditions.checkState(this.mPos >= this.mLength, PreconditionMessage.BLOCK_LENGTH_INCONSISTENT.toString(), Long.valueOf(this.mId), Long.valueOf(this.mLength), Long.valueOf(this.mPos));
            return -1;
        }
        int min = Math.min(i2, this.mCurrentChunk.readableBytes());
        this.mCurrentChunk.readBytes(bArr, i, min);
        this.mPos += min;
        return min;
    }

    @Override // alluxio.client.PositionedReadable
    public int positionedRead(long j, byte[] bArr, int i, int i2) throws IOException {
        if (i2 == 0) {
            return 0;
        }
        if (j < 0 || j >= this.mLength) {
            return -1;
        }
        DataReader create = this.mDataReaderFactory.create(j, i2);
        Throwable th = null;
        while (true) {
            if (i2 <= 0) {
                break;
            }
            DataBuffer dataBuffer = null;
            try {
                try {
                    dataBuffer = create.readChunk();
                    if (dataBuffer != null) {
                        Preconditions.checkState(dataBuffer.readableBytes() <= i2);
                        int readableBytes = dataBuffer.readableBytes();
                        dataBuffer.readBytes(bArr, i, readableBytes);
                        i2 -= readableBytes;
                        i += readableBytes;
                        if (dataBuffer != null) {
                            dataBuffer.release();
                        }
                    } else if (dataBuffer != null) {
                        dataBuffer.release();
                    }
                } finally {
                    if (create != null) {
                        if (0 != 0) {
                            try {
                                create.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            create.close();
                        }
                    }
                }
            } catch (Throwable th3) {
                if (dataBuffer != null) {
                    dataBuffer.release();
                }
                throw th3;
            }
        }
        if (i2 == i2) {
            return -1;
        }
        return i2 - i2;
    }

    @Override // alluxio.client.BoundedStream
    public long remaining() {
        if (this.mEOF) {
            return 0L;
        }
        return this.mLength - this.mPos;
    }

    public void seek(long j) throws IOException {
        checkIfClosed();
        Preconditions.checkArgument(j >= 0, PreconditionMessage.ERR_SEEK_NEGATIVE.toString(), j);
        Preconditions.checkArgument(j <= this.mLength, PreconditionMessage.ERR_SEEK_PAST_END_OF_REGION.toString(), this.mId);
        if (j == this.mPos) {
            return;
        }
        if (j < this.mPos) {
            this.mEOF = false;
        }
        closeDataReader();
        this.mPos = j;
    }

    @Override // java.io.InputStream
    public long skip(long j) throws IOException {
        checkIfClosed();
        if (j <= 0) {
            return 0L;
        }
        long min = Math.min(remaining(), j);
        this.mPos += min;
        closeDataReader();
        return min;
    }

    @Override // java.io.InputStream, java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        try {
            closeDataReader();
            this.mClosed = true;
        } finally {
            this.mDataReaderFactory.close();
        }
    }

    public boolean isShortCircuit() {
        return this.mDataReaderFactory.isShortCircuit();
    }

    private void readChunk() throws IOException {
        if (this.mDataReader == null) {
            this.mDataReader = this.mDataReaderFactory.create(this.mPos, this.mLength - this.mPos);
        }
        if (this.mCurrentChunk != null && this.mCurrentChunk.readableBytes() == 0) {
            this.mCurrentChunk.release();
            this.mCurrentChunk = null;
        }
        if (this.mCurrentChunk == null) {
            this.mCurrentChunk = this.mDataReader.readChunk();
        }
    }

    private void closeDataReader() throws IOException {
        if (this.mCurrentChunk != null) {
            this.mCurrentChunk.release();
            this.mCurrentChunk = null;
        }
        if (this.mDataReader != null) {
            this.mDataReader.close();
        }
        this.mDataReader = null;
    }

    private void checkIfClosed() {
        Preconditions.checkState(!this.mClosed, PreconditionMessage.ERR_CLOSED_BLOCK_IN_STREAM);
    }

    public WorkerNetAddress getAddress() {
        return this.mAddress;
    }

    public BlockInStreamSource getSource() {
        return this.mInStreamSource;
    }

    public long getId() {
        return this.mId;
    }
}
