package alluxio.client.block.stream;

import alluxio.client.block.stream.DataReader;
import alluxio.client.file.FileSystemContext;
import alluxio.grpc.ReadRequest;
import alluxio.network.protocol.databuffer.DataBuffer;
import alluxio.network.protocol.databuffer.NioDataBuffer;
import alluxio.resource.LockResource;
import alluxio.wire.WorkerNetAddress;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.hash.HashFunction;
import com.google.common.hash.Hashing;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import javax.annotation.Nullable;
import javax.annotation.concurrent.NotThreadSafe;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@NotThreadSafe
/* loaded from: input_file:alluxio/client/block/stream/SharedGrpcDataReader.class */
public class SharedGrpcDataReader implements DataReader {
    private static final Logger LOG = LoggerFactory.getLogger(SharedGrpcDataReader.class);
    private static final int BLOCK_LOCK_NUM = 32;
    private static final ReentrantReadWriteLock[] BLOCK_LOCKS = new ReentrantReadWriteLock[BLOCK_LOCK_NUM];
    private static final ConcurrentHashMap<Long, BufferCachingGrpcDataReader> BLOCK_READERS = new ConcurrentHashMap<>();
    private static final HashFunction HASH_FUNC = Hashing.murmur3_32();
    private final long mBlockId;
    private final BufferCachingGrpcDataReader mCachedDataReader;
    private final long mChunkSize;
    private long mPosToRead;

    /* loaded from: input_file:alluxio/client/block/stream/SharedGrpcDataReader$Factory.class */
    public static class Factory implements DataReader.Factory {
        private final FileSystemContext mContext;
        private final WorkerNetAddress mAddress;
        private final ReadRequest mReadRequestPartial;
        private final long mBlockSize;

        public Factory(FileSystemContext fileSystemContext, WorkerNetAddress workerNetAddress, ReadRequest readRequest, long j) {
            this.mContext = fileSystemContext;
            this.mAddress = workerNetAddress;
            this.mReadRequestPartial = readRequest;
            this.mBlockSize = j;
        }

        @Override // alluxio.client.block.stream.DataReader.Factory
        @SuppressFBWarnings(value = {"AT_OPERATION_SEQUENCE_ON_CONCURRENT_ABSTRACTION"}, justification = "operation is still atomic guarded by block Ølock")
        public DataReader create(long j, long j2) throws IOException {
            long blockId = this.mReadRequestPartial.getBlockId();
            LockResource lockResource = new LockResource(SharedGrpcDataReader.getLock(blockId).writeLock());
            try {
                BufferCachingGrpcDataReader bufferCachingGrpcDataReader = (BufferCachingGrpcDataReader) SharedGrpcDataReader.BLOCK_READERS.get(Long.valueOf(blockId));
                if (bufferCachingGrpcDataReader == null) {
                    bufferCachingGrpcDataReader = BufferCachingGrpcDataReader.create(this.mContext, this.mAddress, this.mReadRequestPartial.toBuilder().setOffset(0L).setLength(this.mBlockSize).build());
                    SharedGrpcDataReader.BLOCK_READERS.put(Long.valueOf(blockId), bufferCachingGrpcDataReader);
                }
                bufferCachingGrpcDataReader.ref();
                lockResource.close();
                return new SharedGrpcDataReader(this.mReadRequestPartial.toBuilder().setOffset(j).setLength(j2).build(), bufferCachingGrpcDataReader);
            } catch (Throwable th) {
                try {
                    lockResource.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
                throw th;
            }
        }

        @Override // alluxio.client.block.stream.DataReader.Factory
        public boolean isShortCircuit() {
            return false;
        }

        @Override // java.io.Closeable, java.lang.AutoCloseable
        public void close() throws IOException {
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static ReentrantReadWriteLock getLock(long j) {
        return BLOCK_LOCKS[HASH_FUNC.hashLong(j).asInt() % BLOCK_LOCKS.length];
    }

    @VisibleForTesting
    protected SharedGrpcDataReader(ReadRequest readRequest, BufferCachingGrpcDataReader bufferCachingGrpcDataReader) {
        this.mChunkSize = readRequest.getChunkSize();
        this.mPosToRead = readRequest.getOffset();
        this.mBlockId = readRequest.getBlockId();
        this.mCachedDataReader = bufferCachingGrpcDataReader;
    }

    @Override // alluxio.client.block.stream.DataReader
    public long pos() {
        return this.mPosToRead;
    }

    public void seek(long j) {
        this.mPosToRead = j;
    }

    @Override // alluxio.client.block.stream.DataReader
    @Nullable
    public DataBuffer readChunk() throws IOException {
        DataBuffer readChunk = this.mCachedDataReader.readChunk((int) (this.mPosToRead / this.mChunkSize));
        if (readChunk == null) {
            return null;
        }
        ByteBuffer readOnlyByteBuffer = readChunk.getReadOnlyByteBuffer();
        readOnlyByteBuffer.position((int) (this.mPosToRead % this.mChunkSize));
        this.mPosToRead += this.mChunkSize - (this.mPosToRead % this.mChunkSize);
        return new NioDataBuffer(readOnlyByteBuffer, readOnlyByteBuffer.remaining());
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        if (this.mCachedDataReader.deRef() > 0) {
            return;
        }
        LockResource lockResource = new LockResource(getLock(this.mBlockId).writeLock());
        try {
            if (this.mCachedDataReader.getRefCount() == 0) {
                BLOCK_READERS.remove(Long.valueOf(this.mBlockId));
                this.mCachedDataReader.close();
            }
            lockResource.close();
        } catch (Throwable th) {
            try {
                lockResource.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    static {
        for (int i = 0; i < BLOCK_LOCK_NUM; i++) {
            BLOCK_LOCKS[i] = new ReentrantReadWriteLock();
        }
    }
}
