package alluxio.client.block.stream;

import alluxio.Configuration;
import alluxio.PropertyKey;
import alluxio.client.file.FileSystemContext;
import alluxio.client.file.options.OutStreamOptions;
import alluxio.client.netty.NettyRPC;
import alluxio.client.netty.NettyRPCContext;
import alluxio.proto.dataserver.Protocol;
import alluxio.util.CommonUtils;
import alluxio.util.proto.ProtoMessage;
import alluxio.wire.WorkerNetAddress;
import alluxio.worker.block.io.LocalFileBlockWriter;
import com.google.common.base.Preconditions;
import com.google.common.io.Closer;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import java.io.Closeable;
import java.io.IOException;
import javax.annotation.concurrent.NotThreadSafe;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@NotThreadSafe
/* loaded from: input_file:alluxio/client/block/stream/LocalFilePacketWriter.class */
public final class LocalFilePacketWriter implements PacketWriter {
    private static final Logger LOG = LoggerFactory.getLogger(LocalFilePacketWriter.class);
    private static final long FILE_BUFFER_BYTES = Configuration.getBytes(PropertyKey.USER_FILE_BUFFER_BYTES);
    private static final long WRITE_TIMEOUT_MS = Configuration.getMs(PropertyKey.USER_NETWORK_NETTY_TIMEOUT_MS);
    private final Channel mChannel;
    private final LocalFileBlockWriter mWriter;
    private final long mBlockId;
    private final long mPacketSize;
    private final ProtoMessage mCreateRequest;
    private final NettyRPCContext mNettyRPCContext;
    private final OutStreamOptions mOptions;
    private final Closer mCloser;
    private long mPos;
    private long mPosReserved;
    private boolean mClosed = false;

    public static LocalFilePacketWriter create(final FileSystemContext fileSystemContext, final WorkerNetAddress workerNetAddress, long j, OutStreamOptions outStreamOptions) throws IOException {
        long bytes = Configuration.getBytes(PropertyKey.USER_LOCAL_WRITER_PACKET_SIZE_BYTES);
        Closer create = Closer.create();
        try {
            final Channel acquireNettyChannel = fileSystemContext.acquireNettyChannel(workerNetAddress);
            create.register(new Closeable() { // from class: alluxio.client.block.stream.LocalFilePacketWriter.1
                @Override // java.io.Closeable, java.lang.AutoCloseable
                public void close() throws IOException {
                    FileSystemContext.this.releaseNettyChannel(workerNetAddress, acquireNettyChannel);
                }
            });
            ProtoMessage protoMessage = new ProtoMessage(Protocol.LocalBlockCreateRequest.newBuilder().setBlockId(j).setTier(outStreamOptions.getWriteTier()).setSpaceToReserve(FILE_BUFFER_BYTES).build());
            NettyRPCContext timeout = NettyRPCContext.defaults().setChannel(acquireNettyChannel).setTimeout(WRITE_TIMEOUT_MS);
            ProtoMessage call = NettyRPC.call(timeout, protoMessage);
            Preconditions.checkState(call.isLocalBlockCreateResponse());
            return new LocalFilePacketWriter(j, bytes, outStreamOptions, acquireNettyChannel, create.register(new LocalFileBlockWriter(call.asLocalBlockCreateResponse().getPath())), protoMessage, timeout, create);
        } catch (Exception e) {
            throw CommonUtils.closeAndRethrow(create, e);
        }
    }

    @Override // alluxio.client.block.stream.PacketWriter
    public long pos() {
        return this.mPos;
    }

    @Override // alluxio.client.block.stream.PacketWriter
    public int packetSize() {
        return (int) this.mPacketSize;
    }

    @Override // alluxio.client.block.stream.PacketWriter
    public void writePacket(ByteBuf byteBuf) throws IOException {
        try {
            Preconditions.checkState(!this.mClosed, "PacketWriter is closed while writing packets.");
            int readableBytes = byteBuf.readableBytes();
            ensureReserved(this.mPos + readableBytes);
            this.mPos += readableBytes;
            Preconditions.checkState(byteBuf.readBytes(this.mWriter.getChannel(), readableBytes) == readableBytes);
            byteBuf.release();
        } catch (Throwable th) {
            byteBuf.release();
            throw th;
        }
    }

    @Override // alluxio.client.Cancelable
    public void cancel() throws IOException {
        if (this.mClosed) {
            return;
        }
        this.mClosed = true;
        try {
            try {
                NettyRPC.call(this.mNettyRPCContext, new ProtoMessage(Protocol.LocalBlockCompleteRequest.newBuilder().setBlockId(this.mBlockId).setCancel(true).build()));
                this.mCloser.close();
            } catch (Exception e) {
                throw this.mCloser.rethrow(e);
            }
        } catch (Throwable th) {
            this.mCloser.close();
            throw th;
        }
    }

    @Override // alluxio.client.block.stream.PacketWriter
    public void flush() {
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        if (this.mClosed) {
            return;
        }
        this.mClosed = true;
        this.mCloser.register(new Closeable() { // from class: alluxio.client.block.stream.LocalFilePacketWriter.2
            @Override // java.io.Closeable, java.lang.AutoCloseable
            public void close() throws IOException {
                NettyRPC.call(LocalFilePacketWriter.this.mNettyRPCContext, new ProtoMessage(Protocol.LocalBlockCompleteRequest.newBuilder().setBlockId(LocalFilePacketWriter.this.mBlockId).build()));
            }
        });
        this.mCloser.close();
    }

    private LocalFilePacketWriter(long j, long j2, OutStreamOptions outStreamOptions, Channel channel, LocalFileBlockWriter localFileBlockWriter, ProtoMessage protoMessage, NettyRPCContext nettyRPCContext, Closer closer) {
        this.mChannel = channel;
        this.mCloser = closer;
        this.mOptions = outStreamOptions;
        this.mWriter = localFileBlockWriter;
        this.mCreateRequest = protoMessage;
        this.mNettyRPCContext = nettyRPCContext;
        this.mPosReserved += FILE_BUFFER_BYTES;
        this.mBlockId = j;
        this.mPacketSize = j2;
    }

    private void ensureReserved(long j) throws IOException {
        if (j <= this.mPosReserved) {
            return;
        }
        long max = Math.max(j - this.mPosReserved, FILE_BUFFER_BYTES);
        NettyRPC.call(this.mNettyRPCContext, new ProtoMessage(this.mCreateRequest.asLocalBlockCreateRequest().toBuilder().setSpaceToReserve(max).setOnlyReserveSpace(true).build()));
        this.mPosReserved += max;
    }
}
