package alluxio.client.file;

import alluxio.Configuration;
import alluxio.PropertyKey;
import alluxio.Seekable;
import alluxio.client.AlluxioStorageType;
import alluxio.client.BoundedStream;
import alluxio.client.PositionedReadable;
import alluxio.client.block.AlluxioBlockStore;
import alluxio.client.block.stream.BlockInStream;
import alluxio.client.block.stream.BlockOutStream;
import alluxio.client.file.options.InStreamOptions;
import alluxio.client.file.options.OutStreamOptions;
import alluxio.exception.PreconditionMessage;
import alluxio.exception.status.AlluxioStatusException;
import alluxio.exception.status.AlreadyExistsException;
import alluxio.exception.status.NotFoundException;
import alluxio.master.block.BlockId;
import alluxio.proto.dataserver.Protocol;
import alluxio.wire.WorkerNetAddress;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import java.io.IOException;
import java.io.InputStream;
import javax.annotation.concurrent.NotThreadSafe;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@NotThreadSafe
/* loaded from: input_file:alluxio/client/file/FileInStream.class */
public class FileInStream extends InputStream implements BoundedStream, Seekable, PositionedReadable {
    private static final Logger LOG = LoggerFactory.getLogger(FileInStream.class);
    private static final boolean PASSIVE_CACHE_ENABLED = Configuration.getBoolean(PropertyKey.USER_FILE_PASSIVE_CACHE_ENABLED);
    private final InStreamOptions mInStreamOptions;
    protected final AlluxioStorageType mAlluxioStorageType;
    protected final long mBlockSize;
    protected final long mFileLength;
    protected final FileSystemContext mContext;
    private final AlluxioBlockStore mBlockStore;
    protected URIStatus mStatus;
    protected long mPos;
    private final boolean mCachePartiallyReadBlock;
    private final boolean mShouldCache;
    protected BlockInStream mCurrentBlockInStream;
    protected BlockOutStream mCurrentCacheStream;
    private long mStreamBlockId;
    private byte[] mSeekBuffer;
    private final OutStreamOptions mOutStreamOptions = OutStreamOptions.defaults();
    protected boolean mClosed = false;

    public static FileInStream create(URIStatus uRIStatus, InStreamOptions inStreamOptions, FileSystemContext fileSystemContext) {
        return uRIStatus.getLength() == -1 ? new UnknownLengthFileInStream(uRIStatus, inStreamOptions, fileSystemContext) : new FileInStream(uRIStatus, inStreamOptions, fileSystemContext);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public FileInStream(URIStatus uRIStatus, InStreamOptions inStreamOptions, FileSystemContext fileSystemContext) {
        this.mStatus = uRIStatus;
        this.mInStreamOptions = inStreamOptions;
        this.mBlockSize = uRIStatus.getBlockSizeBytes();
        this.mFileLength = uRIStatus.getLength();
        this.mContext = fileSystemContext;
        this.mAlluxioStorageType = inStreamOptions.getAlluxioStorageType();
        this.mShouldCache = this.mAlluxioStorageType.isStore();
        this.mCachePartiallyReadBlock = inStreamOptions.isCachePartiallyReadBlock();
        if (this.mShouldCache) {
            Preconditions.checkNotNull(inStreamOptions.getCacheLocationPolicy(), PreconditionMessage.FILE_WRITE_LOCATION_POLICY_UNSPECIFIED);
        }
        this.mSeekBuffer = new byte[Math.max((int) inStreamOptions.getSeekBufferSizeBytes(), 1)];
        this.mBlockStore = AlluxioBlockStore.create(fileSystemContext);
        LOG.debug("Init FileInStream with options {}", inStreamOptions);
    }

    @Override // java.io.InputStream, java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        if (this.mClosed) {
            return;
        }
        updateStreams();
        if (shouldCachePartiallyReadBlock()) {
            readCurrentBlockToEnd();
        }
        if (this.mCurrentBlockInStream != null) {
            this.mCurrentBlockInStream.close();
        }
        closeOrCancelCacheStream();
        this.mClosed = true;
    }

    public long getPos() {
        return this.mPos;
    }

    @Override // java.io.InputStream
    public int read() throws IOException {
        return readInternal();
    }

    @Override // java.io.InputStream
    public int read(byte[] bArr) throws IOException {
        return read(bArr, 0, bArr.length);
    }

    @Override // java.io.InputStream
    public int read(byte[] bArr, int i, int i2) throws IOException {
        return readInternal(bArr, i, i2);
    }

    private int readInternal() throws IOException {
        if (remainingInternal() <= 0) {
            return -1;
        }
        updateStreams();
        Preconditions.checkState(this.mCurrentBlockInStream != null, PreconditionMessage.ERR_UNEXPECTED_EOF);
        int read = this.mCurrentBlockInStream.read();
        if (read == -1) {
            return -1;
        }
        this.mPos++;
        if (this.mCurrentCacheStream != null) {
            try {
                this.mCurrentCacheStream.write(read);
            } catch (IOException e) {
                handleCacheStreamException(e);
            }
        }
        return read;
    }

    private int readInternal(byte[] bArr, int i, int i2) throws IOException {
        Preconditions.checkArgument(bArr != null, PreconditionMessage.ERR_READ_BUFFER_NULL);
        Preconditions.checkArgument(i >= 0 && i2 >= 0 && i2 + i <= bArr.length, PreconditionMessage.ERR_BUFFER_STATE.toString(), new Object[]{Integer.valueOf(bArr.length), Integer.valueOf(i), Integer.valueOf(i2)});
        if (i2 == 0) {
            return 0;
        }
        if (remainingInternal() <= 0) {
            return -1;
        }
        int i3 = i;
        int i4 = i2;
        while (i4 > 0 && remainingInternal() > 0) {
            updateStreams();
            Preconditions.checkNotNull(this.mCurrentBlockInStream, PreconditionMessage.ERR_UNEXPECTED_EOF);
            try {
                int read = this.mCurrentBlockInStream.read(bArr, i3, (int) Math.min(i4, this.mCurrentBlockInStream.remaining()));
                if (read > 0) {
                    if (this.mCurrentCacheStream != null) {
                        try {
                            this.mCurrentCacheStream.write(bArr, i3, read);
                        } catch (IOException e) {
                            handleCacheStreamException(e);
                        }
                    }
                    this.mPos += read;
                    i4 -= read;
                    i3 += read;
                }
            } catch (IOException e2) {
                throw AlluxioStatusException.fromIOException(e2);
            }
        }
        if (i4 == i2 && this.mCurrentBlockInStream.remaining() == 0) {
            return -1;
        }
        return i2 - i4;
    }

    @Override // alluxio.client.PositionedReadable
    public int positionedRead(long j, byte[] bArr, int i, int i2) throws IOException {
        return positionedReadInternal(j, bArr, i, i2);
    }

    private int positionedReadInternal(long j, byte[] bArr, int i, int i2) throws IOException {
        int read;
        if (j < 0 || j >= this.mFileLength) {
            return -1;
        }
        if (shouldCachePartiallyReadBlock()) {
            synchronized (this) {
                long j2 = this.mPos;
                try {
                    seek(j);
                    read = read(bArr, i, i2);
                    seek(j2);
                } catch (Throwable th) {
                    seek(j2);
                    throw th;
                }
            }
            return read;
        }
        while (i2 > 0 && j < this.mFileLength) {
            long blockId = getBlockId(j);
            long j3 = j % this.mBlockSize;
            BlockInStream blockInStream = getBlockInStream(blockId);
            Throwable th2 = null;
            try {
                try {
                    int positionedRead = blockInStream.positionedRead(j3, bArr, i, (int) Math.min(this.mBlockSize - j3, i2));
                    Preconditions.checkState(positionedRead > 0, "No data is read before EOF");
                    j += positionedRead;
                    i += positionedRead;
                    i2 -= positionedRead;
                    if (blockInStream != null) {
                        if (0 != 0) {
                            try {
                                blockInStream.close();
                            } catch (Throwable th3) {
                                th2.addSuppressed(th3);
                            }
                        } else {
                            blockInStream.close();
                        }
                    }
                } catch (Throwable th4) {
                    th2 = th4;
                    throw th4;
                }
            } catch (Throwable th5) {
                if (blockInStream != null) {
                    if (th2 != null) {
                        try {
                            blockInStream.close();
                        } catch (Throwable th6) {
                            th2.addSuppressed(th6);
                        }
                    } else {
                        blockInStream.close();
                    }
                }
                throw th5;
            }
        }
        return i2 - i2;
    }

    @Override // alluxio.client.BoundedStream
    public long remaining() {
        return remainingInternal();
    }

    public void seek(long j) throws IOException {
        if (this.mPos == j) {
            return;
        }
        Preconditions.checkArgument(j >= 0, PreconditionMessage.ERR_SEEK_NEGATIVE.toString(), new Object[]{Long.valueOf(j)});
        Preconditions.checkArgument(j <= maxSeekPosition(), PreconditionMessage.ERR_SEEK_PAST_END_OF_FILE.toString(), new Object[]{Long.valueOf(j)});
        if (shouldCachePartiallyReadBlock()) {
            seekInternalWithCachingPartiallyReadBlock(j);
        } else {
            seekInternal(j);
        }
    }

    private boolean shouldCachePartiallyReadBlock() {
        return this.mShouldCache && this.mCachePartiallyReadBlock;
    }

    @Override // java.io.InputStream
    public long skip(long j) throws IOException {
        if (j <= 0) {
            return 0L;
        }
        long min = Math.min(j, remainingInternal());
        seek(this.mPos + min);
        return min;
    }

    protected long maxSeekPosition() {
        return this.mFileLength;
    }

    protected long getBlockSizeAllocation(long j) {
        return getBlockSize(j);
    }

    protected long getBlockSize(long j) {
        long j2 = this.mFileLength % this.mBlockSize;
        return this.mFileLength - j > j2 ? this.mBlockSize : j2;
    }

    protected boolean shouldUpdateStreams(long j) {
        if (this.mCurrentBlockInStream == null || j != this.mStreamBlockId) {
            return true;
        }
        if (this.mCurrentCacheStream == null || this.mCurrentBlockInStream.remaining() == this.mCurrentCacheStream.remaining()) {
            return this.mCurrentBlockInStream.remaining() == 0;
        }
        throw new IllegalStateException(String.format("BlockInStream and CacheStream is out of sync %d %d.", Long.valueOf(this.mCurrentBlockInStream.remaining()), Long.valueOf(this.mCurrentCacheStream.remaining())));
    }

    private void closeOrCancelCacheStream() {
        if (this.mCurrentCacheStream == null) {
            return;
        }
        try {
            if (this.mCurrentCacheStream.remaining() == 0) {
                this.mCurrentCacheStream.close();
            } else {
                this.mCurrentCacheStream.cancel();
            }
        } catch (IOException e) {
            LOG.info("Closing or cancelling the cache stream encountered IOException {}, reading from the regular stream won't be affected.", e.getMessage());
        } catch (AlreadyExistsException e2) {
            LOG.info("Block {} exists.", Long.valueOf(getCurrentBlockId()));
        } catch (NotFoundException e3) {
            LOG.info("Block {} does not exist when being cancelled.", Long.valueOf(getCurrentBlockId()));
        }
        this.mCurrentCacheStream = null;
    }

    private long getCurrentBlockId() {
        if (remainingInternal() <= 0) {
            return -1L;
        }
        return getBlockId(this.mPos);
    }

    private long getBlockId(long j) {
        int i = (int) (j / this.mBlockSize);
        Preconditions.checkState(i < this.mStatus.getBlockIds().size(), PreconditionMessage.ERR_BLOCK_INDEX.toString(), new Object[]{Integer.valueOf(i), Long.valueOf(j), Integer.valueOf(this.mStatus.getBlockIds().size())});
        return ((Long) this.mStatus.getBlockIds().get(i)).longValue();
    }

    private void handleCacheStreamException(IOException iOException) {
        if (Throwables.getRootCause(iOException) instanceof AlreadyExistsException) {
            LOG.info("The block with ID {} is already stored in the target worker, canceling the cache request.", Long.valueOf(getCurrentBlockId()));
        } else {
            LOG.warn("The block with ID {} could not be cached into Alluxio storage: {}", Long.valueOf(getCurrentBlockId()), iOException.toString());
        }
        closeOrCancelCacheStream();
    }

    private void updateStreams() throws IOException {
        long currentBlockId = getCurrentBlockId();
        if (shouldUpdateStreams(currentBlockId)) {
            updateBlockInStream(currentBlockId);
            if (PASSIVE_CACHE_ENABLED) {
                updateCacheStream(currentBlockId);
            }
            this.mStreamBlockId = currentBlockId;
        }
    }

    private void updateCacheStream(long j) {
        Preconditions.checkState(this.mCurrentCacheStream == null || this.mCurrentCacheStream.remaining() == 0);
        closeOrCancelCacheStream();
        Preconditions.checkState(this.mCurrentCacheStream == null);
        if (j < 0) {
            return;
        }
        Preconditions.checkNotNull(this.mCurrentBlockInStream, "mCurrentBlockInStream");
        if (this.mShouldCache && this.mCurrentBlockInStream.Source() == BlockInStream.BlockInStreamSource.REMOTE && this.mPos % this.mBlockSize == 0) {
            try {
                WorkerNetAddress localWorker = this.mContext.getLocalWorker();
                if (localWorker != null) {
                    this.mCurrentCacheStream = this.mBlockStore.getOutStream(j, getBlockSize(this.mPos), localWorker, this.mOutStreamOptions);
                }
            } catch (IOException e) {
                handleCacheStreamException(e);
            }
        }
    }

    private void updateBlockInStream(long j) throws IOException {
        if (this.mCurrentBlockInStream != null) {
            this.mCurrentBlockInStream.close();
            this.mCurrentBlockInStream = null;
        }
        if (j < 0) {
            return;
        }
        this.mCurrentBlockInStream = getBlockInStream(j);
    }

    private BlockInStream getBlockInStream(long j) throws IOException {
        Protocol.OpenUfsBlockOptions openUfsBlockOptions = null;
        if (this.mStatus.isPersisted()) {
            long sequenceNumber = BlockId.getSequenceNumber(j) * this.mBlockSize;
            openUfsBlockOptions = Protocol.OpenUfsBlockOptions.newBuilder().setUfsPath(this.mStatus.getUfsPath()).setOffsetInFile(sequenceNumber).setBlockSize(getBlockSize(sequenceNumber)).setMaxUfsReadConcurrency(this.mInStreamOptions.getMaxUfsReadConcurrency()).setNoCache(!this.mInStreamOptions.getAlluxioStorageType().isStore()).setMountId(this.mStatus.getMountId()).build();
        }
        return this.mBlockStore.getInStream(j, openUfsBlockOptions, this.mInStreamOptions);
    }

    private void seekInternal(long j) throws IOException {
        closeOrCancelCacheStream();
        this.mPos = j;
        updateStreams();
        if (this.mCurrentBlockInStream != null) {
            this.mCurrentBlockInStream.seek(this.mPos % this.mBlockSize);
        } else {
            Preconditions.checkState(remainingInternal() == 0);
        }
    }

    private long remainingInternal() {
        return this.mFileLength - this.mPos;
    }

    private void seekInternalWithCachingPartiallyReadBlock(long j) throws IOException {
        boolean z = j / this.mBlockSize == this.mPos / this.mBlockSize;
        if (z && isReadFromLocalWorker()) {
            this.mPos = j;
            updateStreams();
            if (this.mCurrentBlockInStream != null) {
                this.mCurrentBlockInStream.seek(this.mPos % this.mBlockSize);
                return;
            } else {
                Preconditions.checkState(remaining() == 0);
                return;
            }
        }
        if (!(this.mPos == 0 && this.mCurrentBlockInStream == null && !z) && !isReadFromLocalWorker() && !isRemoteReadButNoLocalWorker()) {
            updateStreams();
            if (!z || j <= this.mPos) {
                readCurrentBlockToEnd();
            } else {
                readCurrentBlockToPos(j);
            }
            if (this.mPos == j) {
                return;
            }
            Preconditions.checkState(this.mCurrentCacheStream == null || this.mCurrentCacheStream.remaining() == 0);
            closeOrCancelCacheStream();
        }
        this.mPos = (j / this.mBlockSize) * this.mBlockSize;
        updateStreams();
        if (!isReadFromLocalWorker() && !isRemoteReadButNoLocalWorker()) {
            readCurrentBlockToPos(j);
        } else if (this.mCurrentBlockInStream != null) {
            seekInternal(j);
        } else {
            Preconditions.checkState(remaining() == 0);
        }
    }

    private boolean isReadFromLocalWorker() {
        return this.mCurrentBlockInStream != null && this.mCurrentBlockInStream.Source() == BlockInStream.BlockInStreamSource.LOCAL;
    }

    private boolean isRemoteReadButNoLocalWorker() throws IOException {
        return this.mCurrentBlockInStream != null && this.mCurrentBlockInStream.Source() == BlockInStream.BlockInStreamSource.REMOTE && this.mContext.getLocalWorker() == null;
    }

    private void readCurrentBlockToPos(long j) throws IOException {
        if (this.mCurrentBlockInStream == null) {
            return;
        }
        long min = Math.min(j - this.mPos, this.mCurrentBlockInStream.remaining());
        if (min <= 0) {
            return;
        }
        do {
            int readInternal = readInternal(this.mSeekBuffer, 0, (int) Math.min(this.mSeekBuffer.length, min));
            Preconditions.checkState(readInternal > 0, PreconditionMessage.ERR_UNEXPECTED_EOF);
            min -= readInternal;
        } while (min > 0);
    }

    private void readCurrentBlockToEnd() throws IOException {
        readCurrentBlockToPos(Long.MAX_VALUE);
    }
}
