Browse Source

HDFS-7180. NFSv3 gateway frequently gets stuck due to GC. Contributed by Brandon Li

(cherry picked from commit d71d40a63d198991077d5babd70be5e9787a53f1)

Conflicts:
	hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt

(cherry picked from commit d27cd5ad771d640b470aabe66c7fbddd508a2967)
Brandon Li 10 years ago
parent
commit
e1baa3a503

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/DFSClientCache.java

@@ -228,7 +228,7 @@ class DFSClientCache {
           RemovalNotification<DFSInputStreamCaheKey, FSDataInputStream> notification) {
         try {
           notification.getValue().close();
-        } catch (IOException e) {
+        } catch (IOException ignored) {
         }
       }
     };

+ 33 - 18
hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtx.java

@@ -175,7 +175,10 @@ class OpenFileCtx {
   
   private volatile boolean enabledDump;
   private FileOutputStream dumpOut;
+  
+  /** Tracks the data buffered in memory related to non sequential writes */
   private AtomicLong nonSequentialWriteInMemory;
+  
   private RandomAccessFile raf;
   private final String dumpFilePath;
   private Daemon dumpThread;
@@ -205,7 +208,7 @@ class OpenFileCtx {
     return (pendingWrites.size() != 0 || pendingCommits.size() != 0);
   }
   
-  // Increase or decrease the memory occupation of non-sequential writes
+  /** Increase or decrease the memory occupation of non-sequential writes */
   private long updateNonSequentialWriteInMemory(long count) {
     long newValue = nonSequentialWriteInMemory.addAndGet(count);
     if (LOG.isDebugEnabled()) {
@@ -214,8 +217,8 @@ class OpenFileCtx {
     }
 
     Preconditions.checkState(newValue >= 0,
-        "nonSequentialWriteInMemory is negative after update with count "
-            + count);
+        "nonSequentialWriteInMemory is negative " + newValue
+            + " after update with count " + count);
     return newValue;
   }
   
@@ -248,7 +251,7 @@ class OpenFileCtx {
     nonSequentialWriteInMemory = new AtomicLong(0);
   
     this.dumpFilePath = dumpFilePath;  
-    enabledDump = dumpFilePath == null ? false: true;
+    enabledDump = dumpFilePath != null;
     nextOffset = new AtomicLong();
     nextOffset.set(latestAttr.getSize());
     try {	
@@ -271,7 +274,7 @@ class OpenFileCtx {
   }
   
   // Check if need to dump the new writes
-  private void checkDump() {
+  private void waitForDump() {
     if (!enabledDump) {
       if (LOG.isDebugEnabled()) {
         LOG.debug("Do nothing, dump is disabled.");
@@ -296,6 +299,14 @@ class OpenFileCtx {
           this.notifyAll();          
         }
       }
+      
+      while (nonSequentialWriteInMemory.get() >= DUMP_WRITE_WATER_MARK) {
+        try {
+          this.wait();
+        } catch (InterruptedException ignored) {
+        }
+      }
+
     }
   }
 
@@ -382,6 +393,7 @@ class OpenFileCtx {
           }
           synchronized (OpenFileCtx.this) {
             if (nonSequentialWriteInMemory.get() < DUMP_WRITE_WATER_MARK) {
+              OpenFileCtx.this.notifyAll();
               try {
                 OpenFileCtx.this.wait();
                 if (LOG.isDebugEnabled()) {
@@ -398,8 +410,13 @@ class OpenFileCtx {
                 + " enabledDump: " + enabledDump);
           }
         } catch (Throwable t) {
+          // unblock threads with new request
+          synchronized (OpenFileCtx.this) {
+            OpenFileCtx.this.notifyAll();
+          }
           LOG.info("Dumper get Throwable: " + t + ". dumpFilePath: "
               + OpenFileCtx.this.dumpFilePath, t);
+          activeState = false;
         }
       }
     }
@@ -563,10 +580,15 @@ class OpenFileCtx {
       // check if there is a WriteCtx with the same range in pendingWrites
       WriteCtx oldWriteCtx = checkRepeatedWriteRequest(request, channel, xid);
       if (oldWriteCtx == null) {
-        addWrite(writeCtx);
+        pendingWrites.put(new OffsetRange(offset, offset + count), writeCtx);
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("New write buffered with xid " + xid + " nextOffset "
+              + cachedOffset + " req offset=" + offset + " mapsize="
+              + pendingWrites.size());
+        }
       } else {
-        LOG.warn("Got a repeated request, same range, with xid:"
-            + writeCtx.getXid());
+        LOG.warn("Got a repeated request, same range, with xid:" + xid
+            + " nextOffset " + +cachedOffset + " req offset=" + offset);
       }
       return writeCtx;
     }
@@ -648,7 +670,7 @@ class OpenFileCtx {
       boolean startWriting = checkAndStartWrite(asyncDataService, writeCtx);
       if (!startWriting) {
         // offset > nextOffset. check if we need to dump data
-        checkDump();
+        waitForDump();
         
         // In test, noticed some Linux client sends a batch (e.g., 1MB)
         // of reordered writes and won't send more writes until it gets
@@ -683,7 +705,7 @@ class OpenFileCtx {
   private WRITE3Response processPerfectOverWrite(DFSClient dfsClient,
       long offset, int count, WriteStableHow stableHow, byte[] data,
       String path, WccData wccData, IdUserGroup iug) {
-    WRITE3Response response = null;
+    WRITE3Response response;
 
     // Read the content back
     byte[] readbuffer = new byte[count];
@@ -890,13 +912,6 @@ class OpenFileCtx {
     return COMMIT_STATUS.COMMIT_WAIT;
   }
   
-  private void addWrite(WriteCtx writeCtx) {
-    long offset = writeCtx.getOffset();
-    int count = writeCtx.getCount();
-    // For the offset range (min, max), min is inclusive, and max is exclusive
-    pendingWrites.put(new OffsetRange(offset, offset + count), writeCtx);
-  }
-  
   /**
    * Check stream status to decide if it should be closed
    * @return true, remove stream; false, keep stream
@@ -1191,7 +1206,7 @@ class OpenFileCtx {
       dumpThread.interrupt();
       try {
         dumpThread.join(3000);
-      } catch (InterruptedException e) {
+      } catch (InterruptedException ignored) {
       }
     }
     

+ 2 - 2
hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtxCache.java

@@ -220,12 +220,12 @@ class OpenFileCtxCache {
 
   void shutdown() {
     // stop the dump thread
-    if (streamMonitor != null && streamMonitor.isAlive()) {
+    if (streamMonitor.isAlive()) {
       streamMonitor.shouldRun(false);
       streamMonitor.interrupt();
       try {
         streamMonitor.join(3000);
-      } catch (InterruptedException e) {
+      } catch (InterruptedException ignored) {
       }
     }
     

+ 26 - 26
hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java

@@ -282,7 +282,7 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
       return response;
     }
 
-    GETATTR3Request request = null;
+    GETATTR3Request request;
     try {
       request = GETATTR3Request.deserialize(xdr);
     } catch (IOException e) {
@@ -374,7 +374,7 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
       return response;
     }
 
-    SETATTR3Request request = null;
+    SETATTR3Request request;
     try {
       request = SETATTR3Request.deserialize(xdr);
     } catch (IOException e) {
@@ -459,7 +459,7 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
       return response;
     }
 
-    LOOKUP3Request request = null;
+    LOOKUP3Request request;
     try {
       request = LOOKUP3Request.deserialize(xdr);
     } catch (IOException e) {
@@ -527,7 +527,7 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
       return response;
     }
 
-    ACCESS3Request request = null;
+    ACCESS3Request request;
     try {
       request = ACCESS3Request.deserialize(xdr);
     } catch (IOException e) {
@@ -594,7 +594,7 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
       return response;
     }
 
-    READLINK3Request request = null;
+    READLINK3Request request;
 
     try {
       request = READLINK3Request.deserialize(xdr);
@@ -668,7 +668,7 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
       return response;
     }
 
-    READ3Request request = null;
+    READ3Request request;
 
     try {
       request = READ3Request.deserialize(xdr);
@@ -710,7 +710,7 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
           securityHandler.getUid(), securityHandler.getGid(),
           securityHandler.getAuxGids(), attrs);
       if ((access & Nfs3Constant.ACCESS3_READ) != 0) {
-        eof = offset < attrs.getSize() ? false : true;
+        eof = offset >= attrs.getSize();
         return new READ3Response(Nfs3Status.NFS3_OK, attrs, 0, eof,
             ByteBuffer.wrap(new byte[0]));
       } else {
@@ -749,7 +749,7 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
         } catch (IOException e) {
           // TODO: A cleaner way is to throw a new type of exception
           // which requires incompatible changes.
-          if (e.getMessage() == "Stream closed") {
+          if (e.getMessage().equals("Stream closed")) {
             clientCache.invalidateDfsInputStream(userName,
                 Nfs3Utils.getFileIdPath(handle));
             continue;
@@ -769,7 +769,7 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
       if (readCount < 0) {
         readCount = 0;
       }
-      eof = (offset + readCount) < attrs.getSize() ? false : true;
+      eof = (offset + readCount) >= attrs.getSize();
       return new READ3Response(Nfs3Status.NFS3_OK, attrs, readCount, eof,
           ByteBuffer.wrap(readbuffer));
 
@@ -801,7 +801,7 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
       return response;
     }
 
-    WRITE3Request request = null;
+    WRITE3Request request;
 
     try {
       request = WRITE3Request.deserialize(xdr);
@@ -883,7 +883,7 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
       return response;
     }
 
-    CREATE3Request request = null;
+    CREATE3Request request;
 
     try {
       request = CREATE3Request.deserialize(xdr);
@@ -1017,7 +1017,7 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
       return response;
     }
 
-    MKDIR3Request request = null;
+    MKDIR3Request request;
 
     try {
       request = MKDIR3Request.deserialize(xdr);
@@ -1114,7 +1114,7 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
       return response;
     }
 
-    REMOVE3Request request = null;
+    REMOVE3Request request;
     try {
       request = REMOVE3Request.deserialize(xdr);
     } catch (IOException e) {
@@ -1194,7 +1194,7 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
       return response;
     }
 
-    RMDIR3Request request = null;
+    RMDIR3Request request;
     try {
       request = RMDIR3Request.deserialize(xdr);
     } catch (IOException e) {
@@ -1375,7 +1375,7 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
       return response;
     }
 
-    SYMLINK3Request request = null;
+    SYMLINK3Request request;
     try {
       request = SYMLINK3Request.deserialize(xdr);
     } catch (IOException e) {
@@ -1431,7 +1431,7 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
    */
   private DirectoryListing listPaths(DFSClient dfsClient, String dirFileIdPath,
       byte[] startAfter) throws IOException {
-    DirectoryListing dlisting = null;
+    DirectoryListing dlisting;
     try {
       dlisting = dfsClient.listPaths(dirFileIdPath, startAfter);
     } catch (RemoteException e) {
@@ -1468,7 +1468,7 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
       return response;
     }
 
-    READDIR3Request request = null;
+    READDIR3Request request;
     try {
       request = READDIR3Request.deserialize(xdr);
     } catch (IOException e) {
@@ -1492,9 +1492,9 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
           + cookie + " count: " + count);
     }
 
-    HdfsFileStatus dirStatus = null;
-    DirectoryListing dlisting = null;
-    Nfs3FileAttributes postOpAttr = null;
+    HdfsFileStatus dirStatus;
+    DirectoryListing dlisting;
+    Nfs3FileAttributes postOpAttr;
     long dotdotFileId = 0;
     try {
       String dirFileIdPath = Nfs3Utils.getFileIdPath(handle);
@@ -1657,8 +1657,8 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
     }
 
     HdfsFileStatus dirStatus;
-    DirectoryListing dlisting = null;
-    Nfs3FileAttributes postOpDirAttr = null;
+    DirectoryListing dlisting;
+    Nfs3FileAttributes postOpDirAttr;
     long dotdotFileId = 0;
     HdfsFileStatus dotdotStatus = null;
     try {
@@ -1803,7 +1803,7 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
       return response;
     }
 
-    FSSTAT3Request request = null;
+    FSSTAT3Request request;
     try {
       request = FSSTAT3Request.deserialize(xdr);
     } catch (IOException e) {
@@ -1877,7 +1877,7 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
       return response;
     }
 
-    FSINFO3Request request = null;
+    FSINFO3Request request;
     try {
       request = FSINFO3Request.deserialize(xdr);
     } catch (IOException e) {
@@ -1941,7 +1941,7 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
       return response;
     }
 
-    PATHCONF3Request request = null;
+    PATHCONF3Request request;
     try {
       request = PATHCONF3Request.deserialize(xdr);
     } catch (IOException e) {
@@ -1992,7 +1992,7 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
       return response;
     }
 
-    COMMIT3Request request = null;
+    COMMIT3Request request;
     try {
       request = COMMIT3Request.deserialize(xdr);
     } catch (IOException e) {

+ 9 - 3
hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteCtx.java

@@ -47,14 +47,20 @@ class WriteCtx {
   public static enum DataState {
     ALLOW_DUMP,
     NO_DUMP,
-    DUMPED;
+    DUMPED
   }
 
   private final FileHandle handle;
   private final long offset;
   private final int count;
   
-  //Only needed for overlapped write, referring OpenFileCtx.addWritesToCache()  
+  /**
+   * Some clients can send a write that includes previously written data along
+   * with new data. In such case the write request is changed to write from only
+   * the new data. {@code originalCount} tracks the number of bytes sent in the
+   * request before it was modified to write only the new data. 
+   * @see OpenFileCtx#addWritesToCache for more details
+   */
   private final int originalCount; 
   public static final int INVALID_ORIGINAL_COUNT = -1;
   
@@ -173,7 +179,7 @@ class WriteCtx {
   public void writeData(HdfsDataOutputStream fos) throws IOException {
     Preconditions.checkState(fos != null);
 
-    ByteBuffer dataBuffer = null;
+    ByteBuffer dataBuffer;
     try {
       dataBuffer = getData();
     } catch (Exception e1) {

+ 2 - 0
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt

@@ -577,6 +577,8 @@ Release 2.6.0 - UNRELEASED
 
     HDFS-7215.Add JvmPauseMonitor to NFS gateway (brandonli)
 
+    HDFS-7180. NFSv3 gateway frequently gets stuck due to GC (brandonli)
+
     BREAKDOWN OF HDFS-6581 SUBTASKS AND RELATED JIRAS
   
       HDFS-6921. Add LazyPersist flag to FileStatus. (Arpit Agarwal)