package org.apache.hadoop.hdfs.nfs.nfs3;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import java.io.Closeable;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.util.EnumSet;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
import org.apache.hadoop.hdfs.nfs.conf.NfsConfigKeys;
import org.apache.hadoop.hdfs.nfs.conf.NfsConfiguration;
import org.apache.hadoop.hdfs.nfs.nfs3.AsyncDataService;
import org.apache.hadoop.hdfs.nfs.nfs3.WriteCtx;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.nfs.nfs3.FileHandle;
import org.apache.hadoop.nfs.nfs3.Nfs3Constant;
import org.apache.hadoop.nfs.nfs3.Nfs3FileAttributes;
import org.apache.hadoop.nfs.nfs3.request.WRITE3Request;
import org.apache.hadoop.nfs.nfs3.response.COMMIT3Response;
import org.apache.hadoop.nfs.nfs3.response.WRITE3Response;
import org.apache.hadoop.nfs.nfs3.response.WccAttr;
import org.apache.hadoop.nfs.nfs3.response.WccData;
import org.apache.hadoop.oncrpc.XDR;
import org.apache.hadoop.oncrpc.security.VerifierNone;
import org.apache.hadoop.security.IdMappingServiceProvider;
import org.apache.hadoop.util.Daemon;
import org.apache.hadoop.util.Time;
import org.jboss.netty.channel.Channel;

/* JADX INFO: Access modifiers changed from: package-private */
/* JADX WARN: Classes with same name are omitted:
  input_file:classes/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtx.class
 */
/* loaded from: input_file:hadoop-hdfs-nfs-2.7.5.0.jar:org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtx.class */
public class OpenFileCtx {
    public static final Log LOG;
    private static long DUMP_WRITE_WATER_MARK;
    private final DFSClient client;
    private final IdMappingServiceProvider iug;
    private volatile boolean activeState;
    private volatile boolean asyncStatus;
    private volatile long asyncWriteBackStartOffset;
    private AtomicLong nextOffset;
    private final HdfsDataOutputStream fos;
    private final boolean aixCompatMode;
    private Nfs3FileAttributes latestAttr;
    private final ConcurrentNavigableMap<OffsetRange, WriteCtx> pendingWrites;
    private final ConcurrentNavigableMap<Long, CommitCtx> pendingCommits;
    private long lastAccessTime;
    private volatile boolean enabledDump;
    private FileOutputStream dumpOut;
    private AtomicLong nonSequentialWriteInMemory;
    private RandomAccessFile raf;
    private final String dumpFilePath;
    private Daemon dumpThread;
    private final boolean uploadLargeFile;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* JADX WARN: Classes with same name are omitted:
      input_file:classes/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtx$COMMIT_STATUS.class
     */
    /* loaded from: input_file:hadoop-hdfs-nfs-2.7.5.0.jar:org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtx$COMMIT_STATUS.class */
    public enum COMMIT_STATUS {
        COMMIT_FINISHED,
        COMMIT_WAIT,
        COMMIT_INACTIVE_CTX,
        COMMIT_INACTIVE_WITH_PENDING_WRITE,
        COMMIT_ERROR,
        COMMIT_DO_SYNC,
        COMMIT_SPECIAL_WAIT,
        COMMIT_SPECIAL_SUCCESS
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* JADX WARN: Classes with same name are omitted:
      input_file:classes/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtx$CommitCtx.class
     */
    /* loaded from: input_file:hadoop-hdfs-nfs-2.7.5.0.jar:org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtx$CommitCtx.class */
    public static class CommitCtx {
        private final long offset;
        private final Channel channel;
        private final int xid;
        private final Nfs3FileAttributes preOpAttr;
        public final long startTime = System.nanoTime();

        long getOffset() {
            return this.offset;
        }

        Channel getChannel() {
            return this.channel;
        }

        int getXid() {
            return this.xid;
        }

        Nfs3FileAttributes getPreOpAttr() {
            return this.preOpAttr;
        }

        long getStartTime() {
            return this.startTime;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public CommitCtx(long j, Channel channel, int i, Nfs3FileAttributes nfs3FileAttributes) {
            this.offset = j;
            this.channel = channel;
            this.xid = i;
            this.preOpAttr = nfs3FileAttributes;
        }

        public String toString() {
            return String.format("offset: %d xid: %d startTime: %d", Long.valueOf(this.offset), Integer.valueOf(this.xid), Long.valueOf(this.startTime));
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* JADX WARN: Classes with same name are omitted:
      input_file:classes/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtx$Dumper.class
     */
    /* loaded from: input_file:hadoop-hdfs-nfs-2.7.5.0.jar:org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtx$Dumper.class */
    public class Dumper implements Runnable {
        Dumper() {
        }

        private void dump() {
            if (OpenFileCtx.this.dumpOut == null) {
                OpenFileCtx.LOG.info("Create dump file: " + OpenFileCtx.this.dumpFilePath);
                File file = new File(OpenFileCtx.this.dumpFilePath);
                try {
                    synchronized (this) {
                        Preconditions.checkState(file.createNewFile(), "The dump file should not exist: %s", new Object[]{OpenFileCtx.this.dumpFilePath});
                        OpenFileCtx.this.dumpOut = new FileOutputStream(file);
                    }
                } catch (IOException e) {
                    OpenFileCtx.LOG.error("Got failure when creating dump stream " + OpenFileCtx.this.dumpFilePath, e);
                    OpenFileCtx.this.enabledDump = false;
                    if (OpenFileCtx.this.dumpOut != null) {
                        try {
                            OpenFileCtx.this.dumpOut.close();
                            return;
                        } catch (IOException e2) {
                            OpenFileCtx.LOG.error("Can't close dump stream " + OpenFileCtx.this.dumpFilePath, e);
                            return;
                        }
                    }
                    return;
                }
            }
            if (OpenFileCtx.this.raf == null) {
                try {
                    OpenFileCtx.this.raf = new RandomAccessFile(OpenFileCtx.this.dumpFilePath, "r");
                } catch (FileNotFoundException e3) {
                    OpenFileCtx.LOG.error("Can't get random access to file " + OpenFileCtx.this.dumpFilePath);
                    OpenFileCtx.this.enabledDump = false;
                    return;
                }
            }
            if (OpenFileCtx.LOG.isDebugEnabled()) {
                OpenFileCtx.LOG.debug("Start dump. Before dump, nonSequentialWriteInMemory == " + OpenFileCtx.this.nonSequentialWriteInMemory.get());
            }
            Iterator it = OpenFileCtx.this.pendingWrites.keySet().iterator();
            while (OpenFileCtx.this.activeState && it.hasNext() && OpenFileCtx.this.nonSequentialWriteInMemory.get() > 0) {
                WriteCtx writeCtx = (WriteCtx) OpenFileCtx.this.pendingWrites.get((OffsetRange) it.next());
                if (writeCtx != null) {
                    try {
                        long dumpData = writeCtx.dumpData(OpenFileCtx.this.dumpOut, OpenFileCtx.this.raf);
                        if (dumpData > 0) {
                            OpenFileCtx.this.updateNonSequentialWriteInMemory(-dumpData);
                        }
                    } catch (IOException e4) {
                        OpenFileCtx.LOG.error("Dump data failed: " + writeCtx + " with error: " + e4 + " OpenFileCtx state: " + OpenFileCtx.this.activeState);
                        OpenFileCtx.this.enabledDump = false;
                        return;
                    }
                }
            }
            if (OpenFileCtx.LOG.isDebugEnabled()) {
                OpenFileCtx.LOG.debug("After dump, nonSequentialWriteInMemory == " + OpenFileCtx.this.nonSequentialWriteInMemory.get());
            }
        }

        /* JADX WARN: Finally extract failed */
        @Override // java.lang.Runnable
        public void run() {
            while (OpenFileCtx.this.activeState && OpenFileCtx.this.enabledDump) {
                try {
                    if (OpenFileCtx.this.nonSequentialWriteInMemory.get() >= OpenFileCtx.DUMP_WRITE_WATER_MARK) {
                        dump();
                    }
                    synchronized (OpenFileCtx.this) {
                        if (OpenFileCtx.this.nonSequentialWriteInMemory.get() < OpenFileCtx.DUMP_WRITE_WATER_MARK) {
                            OpenFileCtx.this.notifyAll();
                            try {
                                OpenFileCtx.this.wait();
                                if (OpenFileCtx.LOG.isDebugEnabled()) {
                                    OpenFileCtx.LOG.debug("Dumper woke up");
                                }
                            } catch (InterruptedException e) {
                                OpenFileCtx.LOG.info("Dumper is interrupted, dumpFilePath= " + OpenFileCtx.this.dumpFilePath);
                            }
                        }
                    }
                    if (OpenFileCtx.LOG.isDebugEnabled()) {
                        OpenFileCtx.LOG.debug("Dumper checking OpenFileCtx activeState: " + OpenFileCtx.this.activeState + " enabledDump: " + OpenFileCtx.this.enabledDump);
                    }
                } catch (Throwable th) {
                    synchronized (OpenFileCtx.this) {
                        OpenFileCtx.this.notifyAll();
                        OpenFileCtx.LOG.info("Dumper get Throwable: " + th + ". dumpFilePath: " + OpenFileCtx.this.dumpFilePath, th);
                        OpenFileCtx.this.activeState = false;
                    }
                }
            }
        }
    }

    private void updateLastAccessTime() {
        this.lastAccessTime = Time.monotonicNow();
    }

    private boolean checkStreamTimeout(long j) {
        return Time.monotonicNow() - this.lastAccessTime > j;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public long getLastAccessTime() {
        return this.lastAccessTime;
    }

    public long getNextOffset() {
        return this.nextOffset.get();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean getActiveState() {
        return this.activeState;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean hasPendingWork() {
        return (this.pendingWrites.size() == 0 && this.pendingCommits.size() == 0) ? false : true;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public long updateNonSequentialWriteInMemory(long j) {
        long addAndGet = this.nonSequentialWriteInMemory.addAndGet(j);
        if (LOG.isDebugEnabled()) {
            LOG.debug("Update nonSequentialWriteInMemory by " + j + " new value: " + addAndGet);
        }
        Preconditions.checkState(addAndGet >= 0, "nonSequentialWriteInMemory is negative " + addAndGet + " after update with count " + j);
        return addAndGet;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public OpenFileCtx(HdfsDataOutputStream hdfsDataOutputStream, Nfs3FileAttributes nfs3FileAttributes, String str, DFSClient dFSClient, IdMappingServiceProvider idMappingServiceProvider) {
        this(hdfsDataOutputStream, nfs3FileAttributes, str, dFSClient, idMappingServiceProvider, false, new NfsConfiguration());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public OpenFileCtx(HdfsDataOutputStream hdfsDataOutputStream, Nfs3FileAttributes nfs3FileAttributes, String str, DFSClient dFSClient, IdMappingServiceProvider idMappingServiceProvider, boolean z, NfsConfiguration nfsConfiguration) {
        this.fos = hdfsDataOutputStream;
        this.latestAttr = nfs3FileAttributes;
        this.aixCompatMode = z;
        this.pendingWrites = new ConcurrentSkipListMap(OffsetRange.ReverseComparatorOnMin);
        this.pendingCommits = new ConcurrentSkipListMap();
        updateLastAccessTime();
        this.activeState = true;
        this.asyncStatus = false;
        this.asyncWriteBackStartOffset = 0L;
        this.dumpOut = null;
        this.raf = null;
        this.nonSequentialWriteInMemory = new AtomicLong(0L);
        this.dumpFilePath = str;
        this.enabledDump = str != null;
        this.nextOffset = new AtomicLong();
        this.nextOffset.set(nfs3FileAttributes.getSize());
        if (!$assertionsDisabled && this.nextOffset.get() != this.fos.getPos()) {
            throw new AssertionError();
        }
        this.dumpThread = null;
        this.client = dFSClient;
        this.iug = idMappingServiceProvider;
        this.uploadLargeFile = nfsConfiguration.getBoolean(NfsConfigKeys.LARGE_FILE_UPLOAD, true);
    }

    public Nfs3FileAttributes getLatestAttr() {
        return this.latestAttr;
    }

    private long getFlushedOffset() throws IOException {
        return this.fos.getPos();
    }

    private void waitForDump() {
        if (!this.enabledDump) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Do nothing, dump is disabled.");
            }
        } else {
            if (this.nonSequentialWriteInMemory.get() < DUMP_WRITE_WATER_MARK) {
                return;
            }
            synchronized (this) {
                if (this.nonSequentialWriteInMemory.get() >= DUMP_WRITE_WATER_MARK) {
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("Asking dumper to dump...");
                    }
                    if (this.dumpThread == null) {
                        this.dumpThread = new Daemon(new Dumper());
                        this.dumpThread.start();
                    } else {
                        notifyAll();
                    }
                }
                while (this.nonSequentialWriteInMemory.get() >= DUMP_WRITE_WATER_MARK) {
                    try {
                        wait();
                    } catch (InterruptedException e) {
                    }
                }
            }
        }
    }

    private WriteCtx checkRepeatedWriteRequest(WRITE3Request wRITE3Request, Channel channel, int i) {
        WriteCtx writeCtx = (WriteCtx) this.pendingWrites.get(new OffsetRange(wRITE3Request.getOffset(), wRITE3Request.getOffset() + wRITE3Request.getCount()));
        if (writeCtx == null) {
            return null;
        }
        if (i != writeCtx.getXid()) {
            LOG.warn("Got a repeated request, same range, with a different xid: " + i + " xid in old request: " + writeCtx.getXid());
        }
        return writeCtx;
    }

    public void receivedNewWrite(DFSClient dFSClient, WRITE3Request wRITE3Request, Channel channel, int i, AsyncDataService asyncDataService, IdMappingServiceProvider idMappingServiceProvider) {
        if (!this.activeState) {
            LOG.info("OpenFileCtx is inactive, fileId: " + wRITE3Request.getHandle().getFileId());
            Nfs3Utils.writeChannel(channel, new WRITE3Response(5, new WccData(this.latestAttr.getWccAttr(), this.latestAttr), 0, wRITE3Request.getStableHow(), Nfs3Constant.WRITE_COMMIT_VERF).serialize(new XDR(), i, new VerifierNone()), i);
            return;
        }
        updateLastAccessTime();
        WriteCtx checkRepeatedWriteRequest = checkRepeatedWriteRequest(wRITE3Request, channel, i);
        if (checkRepeatedWriteRequest == null) {
            receivedNewWriteInternal(dFSClient, wRITE3Request, channel, i, asyncDataService, idMappingServiceProvider);
            return;
        }
        if (checkRepeatedWriteRequest.getReplied()) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Repeated write request which is already served: xid=" + i + ", resend response.");
            }
            Nfs3Utils.writeChannel(channel, new WRITE3Response(0, new WccData(this.latestAttr.getWccAttr(), this.latestAttr), wRITE3Request.getCount(), wRITE3Request.getStableHow(), Nfs3Constant.WRITE_COMMIT_VERF).serialize(new XDR(), i, new VerifierNone()), i);
        } else if (LOG.isDebugEnabled()) {
            LOG.debug("Repeated write request which hasn't been served: xid=" + i + ", drop it.");
        }
    }

    @VisibleForTesting
    public static void alterWriteRequest(WRITE3Request wRITE3Request, long j) {
        long offset = wRITE3Request.getOffset();
        int count = wRITE3Request.getCount();
        long j2 = (offset + count) - j;
        if (LOG.isDebugEnabled()) {
            LOG.debug(String.format("Got overwrite with appended data (%d-%d), current offset %d, drop the overlapped section (%d-%d) and append new data (%d-%d).", Long.valueOf(offset), Long.valueOf((offset + count) - 1), Long.valueOf(j), Long.valueOf(offset), Long.valueOf(j - 1), Long.valueOf(j), Long.valueOf((offset + count) - 1)));
        }
        ByteBuffer data = wRITE3Request.getData();
        Preconditions.checkState(data.position() == 0, "The write request data has non-zero position");
        data.position((int) (j - offset));
        Preconditions.checkState(((long) (data.limit() - data.position())) == j2, "The write request buffer has wrong limit/position regarding count");
        wRITE3Request.setOffset(j);
        wRITE3Request.setCount((int) j2);
    }

    private synchronized WriteCtx addWritesToCache(WRITE3Request wRITE3Request, Channel channel, int i) {
        long offset = wRITE3Request.getOffset();
        int count = wRITE3Request.getCount();
        long j = this.nextOffset.get();
        int i2 = -1;
        if (LOG.isDebugEnabled()) {
            LOG.debug("requested offset=" + offset + " and current offset=" + j);
        }
        if (offset < j && offset + count > j) {
            LOG.warn(String.format("Got overwrite with appended data (%d-%d), current offset %d, drop the overlapped section (%d-%d) and append new data (%d-%d).", Long.valueOf(offset), Long.valueOf((offset + count) - 1), Long.valueOf(j), Long.valueOf(offset), Long.valueOf(j - 1), Long.valueOf(j), Long.valueOf((offset + count) - 1)));
            if (!this.pendingWrites.isEmpty()) {
                LOG.warn("There are other pending writes, fail this jumbo write");
                return null;
            }
            LOG.warn("Modify this write to write only the appended data");
            alterWriteRequest(wRITE3Request, j);
            i2 = count;
            offset = wRITE3Request.getOffset();
            count = wRITE3Request.getCount();
        }
        if (offset < j) {
            LOG.warn("(offset,count,nextOffset): (" + offset + "," + count + "," + this.nextOffset + ")");
            return null;
        }
        WriteCtx writeCtx = new WriteCtx(wRITE3Request.getHandle(), wRITE3Request.getOffset(), wRITE3Request.getCount(), i2, wRITE3Request.getStableHow(), wRITE3Request.getData(), channel, i, false, offset == j ? WriteCtx.DataState.NO_DUMP : WriteCtx.DataState.ALLOW_DUMP);
        if (LOG.isDebugEnabled()) {
            LOG.debug("Add new write to the list with nextOffset " + j + " and requested offset=" + offset);
        }
        if (writeCtx.getDataState() == WriteCtx.DataState.ALLOW_DUMP) {
            updateNonSequentialWriteInMemory(count);
        }
        if (checkRepeatedWriteRequest(wRITE3Request, channel, i) == null) {
            this.pendingWrites.put(new OffsetRange(offset, offset + count), writeCtx);
            if (LOG.isDebugEnabled()) {
                LOG.debug("New write buffered with xid " + i + " nextOffset " + j + " req offset=" + offset + " mapsize=" + this.pendingWrites.size());
            }
        } else {
            LOG.warn("Got a repeated request, same range, with xid: " + i + " nextOffset " + j + " req offset=" + offset);
        }
        return writeCtx;
    }

    private void processOverWrite(DFSClient dFSClient, WRITE3Request wRITE3Request, Channel channel, int i, IdMappingServiceProvider idMappingServiceProvider) {
        WRITE3Response processPerfectOverWrite;
        WccData wccData = new WccData(this.latestAttr.getWccAttr(), (Nfs3FileAttributes) null);
        long offset = wRITE3Request.getOffset();
        int count = wRITE3Request.getCount();
        Nfs3Constant.WriteStableHow stableHow = wRITE3Request.getStableHow();
        if (offset + count > this.nextOffset.get()) {
            LOG.warn("Treat this jumbo write as a real random write, no support.");
            processPerfectOverWrite = new WRITE3Response(22, wccData, 0, Nfs3Constant.WriteStableHow.UNSTABLE, Nfs3Constant.WRITE_COMMIT_VERF);
        } else {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Process perfectOverWrite");
            }
            processPerfectOverWrite = processPerfectOverWrite(dFSClient, offset, count, stableHow, wRITE3Request.getData().array(), Nfs3Utils.getFileIdPath(wRITE3Request.getHandle()), wccData, idMappingServiceProvider);
        }
        updateLastAccessTime();
        Nfs3Utils.writeChannel(channel, processPerfectOverWrite.serialize(new XDR(), i, new VerifierNone()), i);
    }

    private synchronized boolean checkAndStartWrite(AsyncDataService asyncDataService, WriteCtx writeCtx) {
        if (writeCtx.getOffset() != this.nextOffset.get()) {
            return false;
        }
        if (this.asyncStatus) {
            if (!LOG.isDebugEnabled()) {
                return true;
            }
            LOG.debug("The write back thread is working.");
            return true;
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("Trigger the write back task. Current nextOffset: " + this.nextOffset.get());
        }
        this.asyncStatus = true;
        this.asyncWriteBackStartOffset = writeCtx.getOffset();
        asyncDataService.execute(new AsyncDataService.WriteBackTask(this));
        return true;
    }

    private void receivedNewWriteInternal(DFSClient dFSClient, WRITE3Request wRITE3Request, Channel channel, int i, AsyncDataService asyncDataService, IdMappingServiceProvider idMappingServiceProvider) {
        Nfs3Constant.WriteStableHow stableHow = wRITE3Request.getStableHow();
        WccAttr wccAttr = this.latestAttr.getWccAttr();
        int count = wRITE3Request.getCount();
        WriteCtx addWritesToCache = addWritesToCache(wRITE3Request, channel, i);
        if (addWritesToCache == null) {
            processOverWrite(dFSClient, wRITE3Request, channel, i, idMappingServiceProvider);
            return;
        }
        if (checkAndStartWrite(asyncDataService, addWritesToCache)) {
            return;
        }
        waitForDump();
        if (stableHow != Nfs3Constant.WriteStableHow.UNSTABLE) {
            LOG.info("Have to change stable write to unstable write: " + wRITE3Request.getStableHow());
            stableHow = Nfs3Constant.WriteStableHow.UNSTABLE;
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("UNSTABLE write request, send response for offset: " + addWritesToCache.getOffset());
        }
        WRITE3Response wRITE3Response = new WRITE3Response(0, new WccData(wccAttr, this.latestAttr), count, stableHow, Nfs3Constant.WRITE_COMMIT_VERF);
        RpcProgramNfs3.metrics.addWrite(Nfs3Utils.getElapsedTime(addWritesToCache.startTime));
        Nfs3Utils.writeChannel(channel, wRITE3Response.serialize(new XDR(), i, new VerifierNone()), i);
        addWritesToCache.setReplied(true);
    }

    private WRITE3Response processPerfectOverWrite(DFSClient dFSClient, long j, int i, Nfs3Constant.WriteStableHow writeStableHow, byte[] bArr, String str, WccData wccData, IdMappingServiceProvider idMappingServiceProvider) {
        WRITE3Response wRITE3Response;
        byte[] bArr2 = new byte[i];
        Closeable closeable = null;
        try {
            this.fos.hsync(EnumSet.of(HdfsDataOutputStream.SyncFlag.UPDATE_LENGTH));
        } catch (ClosedChannelException e) {
            LOG.info("The FSDataOutputStream has been closed. Continue processing the perfect overwrite.");
        } catch (IOException e2) {
            LOG.info("hsync failed when processing possible perfect overwrite, path=" + str + " error: " + e2);
            return new WRITE3Response(5, wccData, 0, writeStableHow, Nfs3Constant.WRITE_COMMIT_VERF);
        }
        try {
            try {
                closeable = dFSClient.createWrappedInputStream(dFSClient.open(str));
                int read = closeable.read(j, bArr2, 0, i);
                if (read < i) {
                    LOG.error("Can't read back " + i + " bytes, partial read size: " + read);
                    WRITE3Response wRITE3Response2 = new WRITE3Response(5, wccData, 0, writeStableHow, Nfs3Constant.WRITE_COMMIT_VERF);
                    IOUtils.cleanup(LOG, new Closeable[]{closeable});
                    return wRITE3Response2;
                }
                IOUtils.cleanup(LOG, new Closeable[]{closeable});
                if (new BytesWritable.Comparator().compare(bArr2, 0, read, bArr, 0, i) != 0) {
                    LOG.info("Perfect overwrite has different content");
                    wRITE3Response = new WRITE3Response(22, wccData, 0, writeStableHow, Nfs3Constant.WRITE_COMMIT_VERF);
                } else {
                    LOG.info("Perfect overwrite has same content, updating the mtime, then return success");
                    try {
                        dFSClient.setTimes(str, Time.monotonicNow(), -1L);
                        wccData.setPostOpAttr(Nfs3Utils.getFileAttr(dFSClient, str, idMappingServiceProvider));
                        wRITE3Response = new WRITE3Response(0, wccData, i, writeStableHow, Nfs3Constant.WRITE_COMMIT_VERF);
                    } catch (IOException e3) {
                        LOG.info("Got error when processing perfect overwrite, path=" + str + " error: " + e3);
                        return new WRITE3Response(5, wccData, 0, writeStableHow, Nfs3Constant.WRITE_COMMIT_VERF);
                    }
                }
                return wRITE3Response;
            } catch (IOException e4) {
                LOG.info("Read failed when processing possible perfect overwrite, path=" + str, e4);
                WRITE3Response wRITE3Response3 = new WRITE3Response(5, wccData, 0, writeStableHow, Nfs3Constant.WRITE_COMMIT_VERF);
                IOUtils.cleanup(LOG, new Closeable[]{closeable});
                return wRITE3Response3;
            }
        } catch (Throwable th) {
            IOUtils.cleanup(LOG, new Closeable[]{closeable});
            throw th;
        }
    }

    public COMMIT_STATUS checkCommit(DFSClient dFSClient, long j, Channel channel, int i, Nfs3FileAttributes nfs3FileAttributes, boolean z) {
        if (!z) {
            Preconditions.checkState((channel == null || nfs3FileAttributes == null) ? false : true);
            updateLastAccessTime();
        }
        Preconditions.checkState(j >= 0);
        COMMIT_STATUS checkCommitInternal = checkCommitInternal(j, channel, i, nfs3FileAttributes, z);
        if (LOG.isDebugEnabled()) {
            LOG.debug("Got commit status: " + checkCommitInternal.name());
        }
        if (checkCommitInternal == COMMIT_STATUS.COMMIT_DO_SYNC || checkCommitInternal == COMMIT_STATUS.COMMIT_FINISHED) {
            try {
                this.fos.hsync(EnumSet.of(HdfsDataOutputStream.SyncFlag.UPDATE_LENGTH));
                checkCommitInternal = COMMIT_STATUS.COMMIT_FINISHED;
            } catch (ClosedChannelException e) {
                checkCommitInternal = this.pendingWrites.isEmpty() ? COMMIT_STATUS.COMMIT_FINISHED : COMMIT_STATUS.COMMIT_ERROR;
            } catch (IOException e2) {
                LOG.error("Got stream error during data sync: " + e2);
                checkCommitInternal = COMMIT_STATUS.COMMIT_ERROR;
            }
        }
        return checkCommitInternal;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @VisibleForTesting
    public synchronized boolean checkSequential(long j, long j2) {
        Preconditions.checkState(j >= j2, "commitOffset " + j + " less than nextOffset " + j2);
        long j3 = j2;
        for (OffsetRange offsetRange : this.pendingWrites.descendingKeySet()) {
            if (offsetRange.getMin() != j3) {
                return false;
            }
            j3 = offsetRange.getMax();
            if (j3 > j) {
                return true;
            }
        }
        return false;
    }

    private COMMIT_STATUS handleSpecialWait(boolean z, long j, Channel channel, int i, Nfs3FileAttributes nfs3FileAttributes) {
        if (!z) {
            this.pendingCommits.put(Long.valueOf(j), new CommitCtx(j, channel, i, nfs3FileAttributes));
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("return COMMIT_SPECIAL_WAIT");
        }
        return COMMIT_STATUS.COMMIT_SPECIAL_WAIT;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @VisibleForTesting
    public synchronized COMMIT_STATUS checkCommitInternal(long j, Channel channel, int i, Nfs3FileAttributes nfs3FileAttributes, boolean z) {
        if (!this.activeState) {
            return this.pendingWrites.isEmpty() ? COMMIT_STATUS.COMMIT_INACTIVE_CTX : COMMIT_STATUS.COMMIT_INACTIVE_WITH_PENDING_WRITE;
        }
        try {
            long flushedOffset = getFlushedOffset();
            if (LOG.isDebugEnabled()) {
                LOG.debug("getFlushedOffset=" + flushedOffset + " commitOffset=" + j + "nextOffset=" + this.nextOffset.get());
            }
            if (this.pendingWrites.isEmpty()) {
                if (!this.aixCompatMode && flushedOffset < this.nextOffset.get()) {
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("get commit while still writing to the requested offset, with empty queue");
                    }
                    return handleSpecialWait(z, this.nextOffset.get(), channel, i, nfs3FileAttributes);
                }
                return COMMIT_STATUS.COMMIT_FINISHED;
            }
            Preconditions.checkState(flushedOffset <= this.nextOffset.get(), "flushed " + flushedOffset + " is larger than nextOffset " + this.nextOffset.get());
            if (this.uploadLargeFile && !this.aixCompatMode) {
                long max = j > 0 ? j : this.pendingWrites.firstEntry().getKey().getMax() - 1;
                if (max <= flushedOffset) {
                    return COMMIT_STATUS.COMMIT_DO_SYNC;
                }
                if (max < this.nextOffset.get()) {
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("get commit while still writing to the requested offset");
                    }
                    return handleSpecialWait(z, max, channel, i, nfs3FileAttributes);
                }
                if (checkSequential(max, this.nextOffset.get())) {
                    return handleSpecialWait(z, max, channel, i, nfs3FileAttributes);
                }
                if (LOG.isDebugEnabled()) {
                    LOG.debug("return COMMIT_SPECIAL_SUCCESS");
                }
                return COMMIT_STATUS.COMMIT_SPECIAL_SUCCESS;
            }
            if (j > 0) {
                if (!this.aixCompatMode) {
                    if (j <= flushedOffset) {
                        return COMMIT_STATUS.COMMIT_DO_SYNC;
                    }
                    if (!z) {
                        this.pendingCommits.put(Long.valueOf(j), new CommitCtx(j, channel, i, nfs3FileAttributes));
                    }
                    return COMMIT_STATUS.COMMIT_WAIT;
                }
                if (j <= flushedOffset) {
                    return COMMIT_STATUS.COMMIT_DO_SYNC;
                }
            }
            Map.Entry<OffsetRange, WriteCtx> firstEntry = this.pendingWrites.firstEntry();
            if (!z) {
                long max2 = firstEntry.getKey().getMax() - 1;
                Preconditions.checkState(max2 > 0);
                this.pendingCommits.put(Long.valueOf(max2), new CommitCtx(max2, channel, i, nfs3FileAttributes));
            }
            return COMMIT_STATUS.COMMIT_WAIT;
        } catch (IOException e) {
            LOG.error("Can't get flushed offset, error:" + e);
            return COMMIT_STATUS.COMMIT_ERROR;
        }
    }

    public synchronized boolean streamCleanup(long j, long j2) {
        Preconditions.checkState(j2 >= NfsConfigKeys.DFS_NFS_STREAM_TIMEOUT_MIN_DEFAULT);
        if (!this.activeState) {
            return true;
        }
        boolean z = false;
        if (checkStreamTimeout(j2)) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("stream can be closed for fileId: " + j);
            }
            z = true;
        }
        return z;
    }

    private synchronized WriteCtx offerNextToWrite() {
        if (this.pendingWrites.isEmpty()) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("The async write task has no pending writes, fileId: " + this.latestAttr.getFileId());
            }
            processCommits(this.nextOffset.get());
            this.asyncStatus = false;
            return null;
        }
        Map.Entry<OffsetRange, WriteCtx> lastEntry = this.pendingWrites.lastEntry();
        OffsetRange key = lastEntry.getKey();
        WriteCtx value = lastEntry.getValue();
        if (LOG.isTraceEnabled()) {
            LOG.trace("range.getMin()=" + key.getMin() + " nextOffset=" + this.nextOffset);
        }
        long j = this.nextOffset.get();
        if (key.getMin() > j) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("The next sequential write has not arrived yet");
            }
            processCommits(this.nextOffset.get());
            this.asyncStatus = false;
            return null;
        }
        if (key.getMin() < j && key.getMax() > j) {
            LOG.warn("Got an overlapping write (" + key.getMin() + ", " + key.getMax() + "), nextOffset=" + j + ". Silently drop it now");
            this.pendingWrites.remove(key);
            processCommits(this.nextOffset.get());
            return null;
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("Remove write(" + key.getMin() + "-" + key.getMax() + ") from the list");
        }
        this.pendingWrites.remove(key);
        this.nextOffset.addAndGet(value.getCount());
        if (LOG.isDebugEnabled()) {
            LOG.debug("Change nextOffset to " + this.nextOffset.get());
        }
        return value;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void executeWriteBack() {
        WriteCtx offerNextToWrite;
        Preconditions.checkState(this.asyncStatus, "openFileCtx has false asyncStatus, fileId: " + this.latestAttr.getFileId());
        long j = this.asyncWriteBackStartOffset;
        while (this.activeState && (offerNextToWrite = offerNextToWrite()) != null) {
            try {
                doSingleWrite(offerNextToWrite);
                updateLastAccessTime();
            } catch (Throwable th) {
                synchronized (this) {
                    if (j == this.asyncWriteBackStartOffset) {
                        this.asyncStatus = false;
                    } else {
                        LOG.info("Another async task is already started before this one is finalized. fileId: " + this.latestAttr.getFileId() + " asyncStatus: " + this.asyncStatus + " original startOffset: " + j + " new startOffset: " + this.asyncWriteBackStartOffset + ". Won't change asyncStatus here.");
                    }
                    throw th;
                }
            }
        }
        if (!this.activeState && LOG.isDebugEnabled()) {
            LOG.debug("The openFileCtx is not active anymore, fileId: " + this.latestAttr.getFileId());
        }
        synchronized (this) {
            if (j == this.asyncWriteBackStartOffset) {
                this.asyncStatus = false;
            } else {
                LOG.info("Another async task is already started before this one is finalized. fileId: " + this.latestAttr.getFileId() + " asyncStatus: " + this.asyncStatus + " original startOffset: " + j + " new startOffset: " + this.asyncWriteBackStartOffset + ". Won't change asyncStatus here.");
            }
        }
    }

    private void processCommits(long j) {
        int i;
        Preconditions.checkState(j > 0);
        long j2 = 0;
        Map.Entry<Long, CommitCtx> entry = null;
        try {
            j2 = getFlushedOffset();
            entry = this.pendingCommits.firstEntry();
        } catch (ClosedChannelException e) {
            if (!this.pendingWrites.isEmpty()) {
                LOG.error("Can't sync for fileId: " + this.latestAttr.getFileId() + ". Channel closed with writes pending.", e);
            }
            i = 5;
        } catch (IOException e2) {
            LOG.error("Got stream error during data sync: ", e2);
            i = 5;
        }
        if (entry == null || entry.getValue().offset > j2) {
            return;
        }
        this.fos.hsync(EnumSet.of(HdfsDataOutputStream.SyncFlag.UPDATE_LENGTH));
        i = 0;
        try {
            this.latestAttr = Nfs3Utils.getFileAttr(this.client, Nfs3Utils.getFileIdPath(this.latestAttr.getFileId()), this.iug);
        } catch (IOException e3) {
            LOG.error("Can't get new file attr, fileId: " + this.latestAttr.getFileId(), e3);
            i = 5;
        }
        if (this.latestAttr.getSize() != j) {
            LOG.error("After sync, the expect file size: " + j + ", however actual file size is: " + this.latestAttr.getSize());
            i = 5;
        }
        WccData wccData = new WccData(Nfs3Utils.getWccAttr(this.latestAttr), this.latestAttr);
        while (entry != null && entry.getValue().offset <= j2) {
            this.pendingCommits.remove(entry.getKey());
            CommitCtx value = entry.getValue();
            COMMIT3Response cOMMIT3Response = new COMMIT3Response(i, wccData, Nfs3Constant.WRITE_COMMIT_VERF);
            RpcProgramNfs3.metrics.addCommit(Nfs3Utils.getElapsedTime(value.startTime));
            Nfs3Utils.writeChannelCommit(value.getChannel(), cOMMIT3Response.serialize(new XDR(), value.getXid(), new VerifierNone()), value.getXid());
            if (LOG.isDebugEnabled()) {
                LOG.debug("FileId: " + this.latestAttr.getFileId() + " Service time: " + Nfs3Utils.getElapsedTime(value.startTime) + "ns. Sent response for commit: " + value);
            }
            entry = this.pendingCommits.firstEntry();
        }
    }

    private void doSingleWrite(WriteCtx writeCtx) {
        Channel channel = writeCtx.getChannel();
        int xid = writeCtx.getXid();
        long offset = writeCtx.getOffset();
        int count = writeCtx.getCount();
        Nfs3Constant.WriteStableHow stableHow = writeCtx.getStableHow();
        FileHandle handle = writeCtx.getHandle();
        if (LOG.isDebugEnabled()) {
            LOG.debug("do write, fileId: " + handle.getFileId() + " offset: " + offset + " length: " + count + " stableHow: " + stableHow.name());
        }
        try {
            writeCtx.writeData(this.fos);
            RpcProgramNfs3.metrics.incrBytesWritten(writeCtx.getCount());
            long flushedOffset = getFlushedOffset();
            if (flushedOffset != offset + count) {
                throw new IOException("output stream is out of sync, pos=" + flushedOffset + " and nextOffset should be" + (offset + count));
            }
            if (writeCtx.getDataState() == WriteCtx.DataState.ALLOW_DUMP) {
                synchronized (writeCtx) {
                    if (writeCtx.getDataState() == WriteCtx.DataState.ALLOW_DUMP) {
                        writeCtx.setDataState(WriteCtx.DataState.NO_DUMP);
                        updateNonSequentialWriteInMemory(-count);
                        if (LOG.isDebugEnabled()) {
                            LOG.debug("After writing " + handle.getFileId() + " at offset " + offset + ", updated the memory count, new value: " + this.nonSequentialWriteInMemory.get());
                        }
                    }
                }
            }
            if (!writeCtx.getReplied()) {
                if (stableHow != Nfs3Constant.WriteStableHow.UNSTABLE) {
                    LOG.info("Do sync for stable write: " + writeCtx);
                    try {
                        if (stableHow == Nfs3Constant.WriteStableHow.DATA_SYNC) {
                            this.fos.hsync();
                        } else {
                            Preconditions.checkState(stableHow == Nfs3Constant.WriteStableHow.FILE_SYNC, "Unknown WriteStableHow: " + stableHow);
                            this.fos.hsync(EnumSet.of(HdfsDataOutputStream.SyncFlag.UPDATE_LENGTH));
                        }
                    } catch (IOException e) {
                        LOG.error("hsync failed with writeCtx: " + writeCtx, e);
                        throw e;
                    }
                }
                WccData wccData = new WccData(this.latestAttr.getWccAttr(), this.latestAttr);
                if (writeCtx.getOriginalCount() != -1) {
                    LOG.warn("Return original count: " + writeCtx.getOriginalCount() + " instead of real data count: " + count);
                    count = writeCtx.getOriginalCount();
                }
                WRITE3Response wRITE3Response = new WRITE3Response(0, wccData, count, stableHow, Nfs3Constant.WRITE_COMMIT_VERF);
                RpcProgramNfs3.metrics.addWrite(Nfs3Utils.getElapsedTime(writeCtx.startTime));
                Nfs3Utils.writeChannel(channel, wRITE3Response.serialize(new XDR(), xid, new VerifierNone()), xid);
            }
            processCommits(writeCtx.getOffset() + writeCtx.getCount());
        } catch (IOException e2) {
            LOG.error("Error writing to fileId " + handle.getFileId() + " at offset " + offset + " and length " + count, e2);
            if (!writeCtx.getReplied()) {
                Nfs3Utils.writeChannel(channel, new WRITE3Response(5).serialize(new XDR(), xid, new VerifierNone()), xid);
            }
            LOG.info("Clean up open file context for fileId: " + this.latestAttr.getFileId());
            cleanup();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public synchronized void cleanup() {
        if (!this.activeState) {
            LOG.info("Current OpenFileCtx is already inactive, no need to cleanup.");
            return;
        }
        this.activeState = false;
        if (this.dumpThread != null && this.dumpThread.isAlive()) {
            this.dumpThread.interrupt();
            try {
                this.dumpThread.join(3000L);
            } catch (InterruptedException e) {
            }
        }
        try {
            if (this.fos != null) {
                this.fos.close();
            }
        } catch (IOException e2) {
            LOG.info("Can't close stream for fileId: " + this.latestAttr.getFileId() + ", error: " + e2);
        }
        LOG.info("There are " + this.pendingWrites.size() + " pending writes.");
        WccAttr wccAttr = this.latestAttr.getWccAttr();
        while (!this.pendingWrites.isEmpty()) {
            OffsetRange offsetRange = (OffsetRange) this.pendingWrites.firstKey();
            LOG.info("Fail pending write: (" + offsetRange.getMin() + ", " + offsetRange.getMax() + "), nextOffset=" + this.nextOffset.get());
            WriteCtx writeCtx = (WriteCtx) this.pendingWrites.remove(offsetRange);
            if (!writeCtx.getReplied()) {
                Nfs3Utils.writeChannel(writeCtx.getChannel(), new WRITE3Response(5, new WccData(wccAttr, this.latestAttr), 0, writeCtx.getStableHow(), Nfs3Constant.WRITE_COMMIT_VERF).serialize(new XDR(), writeCtx.getXid(), new VerifierNone()), writeCtx.getXid());
            }
        }
        if (this.dumpOut != null) {
            try {
                this.dumpOut.close();
            } catch (IOException e3) {
                LOG.error("Failed to close outputstream of dump file" + this.dumpFilePath, e3);
            }
            File file = new File(this.dumpFilePath);
            if (file.exists() && !file.delete()) {
                LOG.error("Failed to delete dumpfile: " + file);
            }
        }
        if (this.raf != null) {
            try {
                this.raf.close();
            } catch (IOException e4) {
                LOG.error("Got exception when closing input stream of dump file.", e4);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @VisibleForTesting
    public ConcurrentNavigableMap<OffsetRange, WriteCtx> getPendingWritesForTest() {
        return this.pendingWrites;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @VisibleForTesting
    public ConcurrentNavigableMap<Long, CommitCtx> getPendingCommitsForTest() {
        return this.pendingCommits;
    }

    @VisibleForTesting
    long getNextOffsetForTest() {
        return this.nextOffset.get();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @VisibleForTesting
    public void setNextOffsetForTest(long j) {
        this.nextOffset.set(j);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @VisibleForTesting
    public void setActiveStatusForTest(boolean z) {
        this.activeState = z;
    }

    public String toString() {
        return String.format("activeState: %b asyncStatus: %b nextOffset: %d", Boolean.valueOf(this.activeState), Boolean.valueOf(this.asyncStatus), Long.valueOf(this.nextOffset.get()));
    }

    static {
        $assertionsDisabled = !OpenFileCtx.class.desiredAssertionStatus();
        LOG = LogFactory.getLog(OpenFileCtx.class);
        DUMP_WRITE_WATER_MARK = 1048576L;
    }
}
