Просмотр исходного кода

HDFS-7609. Avoid retry cache collision when Standby NameNode loading edits. Contributed by Ming Ma.

(cherry picked from commit 7817674a3a4d097b647dd77f1345787dd376d5ea)
(cherry picked from commit 17fb442a4c4e43105374c97fccd68dd966729a19)
Jing Zhao 10 лет назад
Родитель
Сommit
6c127b44ca

+ 3 - 0
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt

@@ -53,6 +53,9 @@ Release 2.7.2 - UNRELEASED
     HDFS-8431. hdfs crypto class not found in Windows.
     (Anu Engineer via cnauroth)
 
+    HDFS-7609. Avoid retry cache collision when Standby NameNode loading edits.
+    (Ming Ma via jing9)
+
 Release 2.7.1 - 2015-07-06
 
   INCOMPATIBLE CHANGES

+ 0 - 19
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java

@@ -1874,7 +1874,6 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
    */
   void concat(String target, String [] srcs, boolean logRetryCache)
       throws IOException {
-    checkOperation(OperationCategory.WRITE);
     waitForLoadingFSImage();
     HdfsFileStatus stat = null;
     boolean success = false;
@@ -2376,7 +2375,6 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     boolean skipSync = false;
     HdfsFileStatus stat = null;
     FSPermissionChecker pc = getPermissionChecker();
-    checkOperation(OperationCategory.WRITE);
     if (blockSize < minBlockSize) {
       throw new IOException("Specified block size is less than configured" +
           " minimum value (" + DFSConfigKeys.DFS_NAMENODE_MIN_BLOCK_SIZE_KEY
@@ -2974,7 +2972,6 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     LocatedBlock lb = null;
     HdfsFileStatus stat = null;
     FSPermissionChecker pc = getPermissionChecker();
-    checkOperation(OperationCategory.WRITE);
     byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
     writeLock();
     try {
@@ -3645,7 +3642,6 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
   boolean renameTo(String src, String dst, boolean logRetryCache)
       throws IOException {
     waitForLoadingFSImage();
-    checkOperation(OperationCategory.WRITE);
     FSDirRenameOp.RenameOldResult ret = null;
     writeLock();
     try {
@@ -3671,7 +3667,6 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
                 boolean logRetryCache, Options.Rename... options)
       throws IOException {
     waitForLoadingFSImage();
-    checkOperation(OperationCategory.WRITE);
     Map.Entry<BlocksMapUpdateInfo, HdfsFileStatus> res = null;
     writeLock();
     try {
@@ -3708,7 +3703,6 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
   boolean delete(String src, boolean recursive, boolean logRetryCache)
       throws IOException {
     waitForLoadingFSImage();
-    checkOperation(OperationCategory.WRITE);
     BlocksMapUpdateInfo toRemovedBlocks = null;
     writeLock();
     boolean ret = false;
@@ -6332,8 +6326,6 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
       String clientName, ExtendedBlock oldBlock, ExtendedBlock newBlock,
       DatanodeID[] newNodes, String[] newStorageIDs, boolean logRetryCache)
       throws IOException {
-    checkOperation(OperationCategory.WRITE);
-
     LOG.info("updatePipeline(" + oldBlock.getLocalBlock()
              + ", newGS=" + newBlock.getGenerationStamp()
              + ", newLength=" + newBlock.getNumBytes()
@@ -7370,7 +7362,6 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
   void renameSnapshot(
       String path, String snapshotOldName, String snapshotNewName,
       boolean logRetryCache) throws IOException {
-    checkOperation(OperationCategory.WRITE);
     boolean success = false;
     writeLock();
     try {
@@ -7454,7 +7445,6 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
    */
   void deleteSnapshot(String snapshotRoot, String snapshotName,
       boolean logRetryCache) throws IOException {
-    checkOperation(OperationCategory.WRITE);
     boolean success = false;
     writeLock();
     BlocksMapUpdateInfo blocksToBeDeleted = null;
@@ -7682,7 +7672,6 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
   long addCacheDirective(CacheDirectiveInfo directive,
                          EnumSet<CacheFlag> flags, boolean logRetryCache)
       throws IOException {
-    checkOperation(OperationCategory.WRITE);
     CacheDirectiveInfo effectiveDirective = null;
     if (!flags.contains(CacheFlag.FORCE)) {
       cacheManager.waitForRescanIfNeeded();
@@ -7713,7 +7702,6 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
 
   void modifyCacheDirective(CacheDirectiveInfo directive,
       EnumSet<CacheFlag> flags, boolean logRetryCache) throws IOException {
-    checkOperation(OperationCategory.WRITE);
     boolean success = false;
     if (!flags.contains(CacheFlag.FORCE)) {
       cacheManager.waitForRescanIfNeeded();
@@ -7740,7 +7728,6 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
   }
 
   void removeCacheDirective(long id, boolean logRetryCache) throws IOException {
-    checkOperation(OperationCategory.WRITE);
     boolean success = false;
     writeLock();
     try {
@@ -7782,7 +7769,6 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
 
   void addCachePool(CachePoolInfo req, boolean logRetryCache)
       throws IOException {
-    checkOperation(OperationCategory.WRITE);
     writeLock();
     boolean success = false;
     String poolInfoStr = null;
@@ -7806,7 +7792,6 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
 
   void modifyCachePool(CachePoolInfo req, boolean logRetryCache)
       throws IOException {
-    checkOperation(OperationCategory.WRITE);
     writeLock();
     boolean success = false;
     try {
@@ -7830,7 +7815,6 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
 
   void removeCachePool(String cachePoolName, boolean logRetryCache)
       throws IOException {
-    checkOperation(OperationCategory.WRITE);
     writeLock();
     boolean success = false;
     try {
@@ -8028,7 +8012,6 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     String src = srcArg;
     HdfsFileStatus resultingStat = null;
     checkSuperuserPrivilege();
-    checkOperation(OperationCategory.WRITE);
     final byte[][] pathComponents =
       FSDirectory.getPathComponentsForReservedPath(src);
     FSPermissionChecker pc = getPermissionChecker();
@@ -8114,7 +8097,6 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
   void setXAttr(String src, XAttr xAttr, EnumSet<XAttrSetFlag> flag,
                 boolean logRetryCache)
       throws IOException {
-    checkOperation(OperationCategory.WRITE);
     HdfsFileStatus auditStat = null;
     writeLock();
     try {
@@ -8162,7 +8144,6 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
 
   void removeXAttr(String src, XAttr xAttr, boolean logRetryCache)
       throws IOException {
-    checkOperation(OperationCategory.WRITE);
     HdfsFileStatus auditStat = null;
     writeLock();
     try {

+ 20 - 2
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java

@@ -610,7 +610,7 @@ class NameNodeRpcServer implements NamenodeProtocols {
       throw new IOException("create: Pathname too long.  Limit "
           + MAX_PATH_LENGTH + " characters, " + MAX_PATH_DEPTH + " levels.");
     }
-
+    namesystem.checkOperation(OperationCategory.WRITE);
     CacheEntryWithPayload cacheEntry = RetryCache.waitForCompletion(retryCache, null);
     if (cacheEntry != null && cacheEntry.isSuccess()) {
       return (HdfsFileStatus) cacheEntry.getPayload();
@@ -641,6 +641,7 @@ class NameNodeRpcServer implements NamenodeProtocols {
       stateChangeLog.debug("*DIR* NameNode.append: file "
           +src+" for "+clientName+" at "+clientMachine);
     }
+    namesystem.checkOperation(OperationCategory.WRITE);
     CacheEntryWithPayload cacheEntry = RetryCache.waitForCompletion(retryCache,
         null);
     if (cacheEntry != null && cacheEntry.isSuccess()) {
@@ -810,6 +811,7 @@ class NameNodeRpcServer implements NamenodeProtocols {
       ExtendedBlock newBlock, DatanodeID[] newNodes, String[] newStorageIDs)
       throws IOException {
     checkNNStartup();
+    namesystem.checkOperation(OperationCategory.WRITE);
     CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
     if (cacheEntry != null && cacheEntry.isSuccess()) {
       return; // Return previous response
@@ -854,7 +856,7 @@ class NameNodeRpcServer implements NamenodeProtocols {
       throw new IOException("rename: Pathname too long.  Limit "
           + MAX_PATH_LENGTH + " characters, " + MAX_PATH_DEPTH + " levels.");
     }
-
+    namesystem.checkOperation(OperationCategory.WRITE);
     CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
     if (cacheEntry != null && cacheEntry.isSuccess()) {
       return true; // Return previous response
@@ -875,6 +877,7 @@ class NameNodeRpcServer implements NamenodeProtocols {
   @Override // ClientProtocol
   public void concat(String trg, String[] src) throws IOException {
     checkNNStartup();
+    namesystem.checkOperation(OperationCategory.WRITE);
     CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
     if (cacheEntry != null && cacheEntry.isSuccess()) {
       return; // Return previous response
@@ -900,6 +903,7 @@ class NameNodeRpcServer implements NamenodeProtocols {
       throw new IOException("rename: Pathname too long.  Limit "
           + MAX_PATH_LENGTH + " characters, " + MAX_PATH_DEPTH + " levels.");
     }
+    namesystem.checkOperation(OperationCategory.WRITE);
     CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
     if (cacheEntry != null && cacheEntry.isSuccess()) {
       return; // Return previous response
@@ -937,6 +941,7 @@ class NameNodeRpcServer implements NamenodeProtocols {
       stateChangeLog.debug("*DIR* Namenode.delete: src=" + src
           + ", recursive=" + recursive);
     }
+    namesystem.checkOperation(OperationCategory.WRITE);
     CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
     if (cacheEntry != null && cacheEntry.isSuccess()) {
       return true; // Return previous response
@@ -1221,6 +1226,7 @@ class NameNodeRpcServer implements NamenodeProtocols {
   public void createSymlink(String target, String link, FsPermission dirPerms,
       boolean createParent) throws IOException {
     checkNNStartup();
+    namesystem.checkOperation(OperationCategory.WRITE);
     CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
     if (cacheEntry != null && cacheEntry.isSuccess()) {
       return; // Return previous response
@@ -1552,6 +1558,7 @@ class NameNodeRpcServer implements NamenodeProtocols {
       throw new IOException("createSnapshot: Pathname too long.  Limit "
           + MAX_PATH_LENGTH + " characters, " + MAX_PATH_DEPTH + " levels.");
     }
+    namesystem.checkOperation(OperationCategory.WRITE);
     CacheEntryWithPayload cacheEntry = RetryCache.waitForCompletion(retryCache,
       null);
     if (cacheEntry != null && cacheEntry.isSuccess()) {
@@ -1573,6 +1580,7 @@ class NameNodeRpcServer implements NamenodeProtocols {
   public void deleteSnapshot(String snapshotRoot, String snapshotName)
       throws IOException {
     checkNNStartup();
+    namesystem.checkOperation(OperationCategory.WRITE);
     metrics.incrDeleteSnapshotOps();
     CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
     if (cacheEntry != null && cacheEntry.isSuccess()) {
@@ -1611,6 +1619,7 @@ class NameNodeRpcServer implements NamenodeProtocols {
     if (snapshotNewName == null || snapshotNewName.isEmpty()) {
       throw new IOException("The new snapshot name is null or empty.");
     }
+    namesystem.checkOperation(OperationCategory.WRITE);
     metrics.incrRenameSnapshotOps();
     CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
     if (cacheEntry != null && cacheEntry.isSuccess()) {
@@ -1650,6 +1659,7 @@ class NameNodeRpcServer implements NamenodeProtocols {
   public long addCacheDirective(
       CacheDirectiveInfo path, EnumSet<CacheFlag> flags) throws IOException {
     checkNNStartup();
+    namesystem.checkOperation(OperationCategory.WRITE);
     CacheEntryWithPayload cacheEntry = RetryCache.waitForCompletion
       (retryCache, null);
     if (cacheEntry != null && cacheEntry.isSuccess()) {
@@ -1671,6 +1681,7 @@ class NameNodeRpcServer implements NamenodeProtocols {
   public void modifyCacheDirective(
       CacheDirectiveInfo directive, EnumSet<CacheFlag> flags) throws IOException {
     checkNNStartup();
+    namesystem.checkOperation(OperationCategory.WRITE);
     CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
     if (cacheEntry != null && cacheEntry.isSuccess()) {
       return;
@@ -1688,6 +1699,7 @@ class NameNodeRpcServer implements NamenodeProtocols {
   @Override // ClientProtocol
   public void removeCacheDirective(long id) throws IOException {
     checkNNStartup();
+    namesystem.checkOperation(OperationCategory.WRITE);
     CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
     if (cacheEntry != null && cacheEntry.isSuccess()) {
       return;
@@ -1714,6 +1726,7 @@ class NameNodeRpcServer implements NamenodeProtocols {
   @Override //ClientProtocol
   public void addCachePool(CachePoolInfo info) throws IOException {
     checkNNStartup();
+    namesystem.checkOperation(OperationCategory.WRITE);
     CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
     if (cacheEntry != null && cacheEntry.isSuccess()) {
       return; // Return previous response
@@ -1730,6 +1743,7 @@ class NameNodeRpcServer implements NamenodeProtocols {
   @Override // ClientProtocol
   public void modifyCachePool(CachePoolInfo info) throws IOException {
     checkNNStartup();
+    namesystem.checkOperation(OperationCategory.WRITE);
     CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
     if (cacheEntry != null && cacheEntry.isSuccess()) {
       return; // Return previous response
@@ -1746,6 +1760,7 @@ class NameNodeRpcServer implements NamenodeProtocols {
   @Override // ClientProtocol
   public void removeCachePool(String cachePoolName) throws IOException {
     checkNNStartup();
+    namesystem.checkOperation(OperationCategory.WRITE);
     CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
     if (cacheEntry != null && cacheEntry.isSuccess()) {
       return;
@@ -1808,6 +1823,7 @@ class NameNodeRpcServer implements NamenodeProtocols {
   public void createEncryptionZone(String src, String keyName)
     throws IOException {
     checkNNStartup();
+    namesystem.checkOperation(OperationCategory.WRITE);
     final CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
     if (cacheEntry != null && cacheEntry.isSuccess()) {
       return;
@@ -1839,6 +1855,7 @@ class NameNodeRpcServer implements NamenodeProtocols {
   public void setXAttr(String src, XAttr xAttr, EnumSet<XAttrSetFlag> flag)
       throws IOException {
     checkNNStartup();
+    namesystem.checkOperation(OperationCategory.WRITE);
     CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
     if (cacheEntry != null && cacheEntry.isSuccess()) {
       return; // Return previous response
@@ -1868,6 +1885,7 @@ class NameNodeRpcServer implements NamenodeProtocols {
   @Override // ClientProtocol
   public void removeXAttr(String src, XAttr xAttr) throws IOException {
     checkNNStartup();
+    namesystem.checkOperation(OperationCategory.WRITE);
     CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
     if (cacheEntry != null && cacheEntry.isSuccess()) {
       return; // Return previous response

+ 35 - 2
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestRetryCacheWithHA.java

@@ -214,7 +214,8 @@ public class TestRetryCacheWithHA {
   abstract class AtMostOnceOp {
     private final String name;
     final DFSClient client;
-    
+    int expectedUpdateCount = 0;
+
     AtMostOnceOp(String name, DFSClient client) {
       this.name = name;
       this.client = client;
@@ -224,6 +225,9 @@ public class TestRetryCacheWithHA {
     abstract void invoke() throws Exception;
     abstract boolean checkNamenodeBeforeReturn() throws Exception;
     abstract Object getResult();
+    int getExpectedCacheUpdateCount() {
+      return expectedUpdateCount;
+    }
   }
   
   /** createSnapshot operaiton */
@@ -603,7 +607,7 @@ public class TestRetryCacheWithHA {
   class DeleteOp extends AtMostOnceOp {
     private final String target;
     private boolean deleted;
-    
+
     DeleteOp(DFSClient client, String target) {
       super("delete", client);
       this.target = target;
@@ -613,12 +617,14 @@ public class TestRetryCacheWithHA {
     void prepare() throws Exception {
       Path p = new Path(target);
       if (!dfs.exists(p)) {
+        expectedUpdateCount++;
         DFSTestUtil.createFile(dfs, p, BlockSize, DataNodes, 0);
       }
     }
 
     @Override
     void invoke() throws Exception {
+      expectedUpdateCount++;
       deleted = client.delete(target, true);
     }
 
@@ -654,12 +660,14 @@ public class TestRetryCacheWithHA {
     void prepare() throws Exception {
       Path p = new Path(target);
       if (!dfs.exists(p)) {
+        expectedUpdateCount++;
         DFSTestUtil.createFile(dfs, p, BlockSize, DataNodes, 0);
       }
     }
 
     @Override
     void invoke() throws Exception {
+      expectedUpdateCount++;
       client.createSymlink(target, link, false);
     }
 
@@ -772,11 +780,13 @@ public class TestRetryCacheWithHA {
 
     @Override
     void prepare() throws Exception {
+      expectedUpdateCount++;
       dfs.addCachePool(new CachePoolInfo(directive.getPool()));
     }
 
     @Override
     void invoke() throws Exception {
+      expectedUpdateCount++;
       result = client.addCacheDirective(directive, EnumSet.of(CacheFlag.FORCE));
     }
 
@@ -818,12 +828,15 @@ public class TestRetryCacheWithHA {
 
     @Override
     void prepare() throws Exception {
+      expectedUpdateCount++;
       dfs.addCachePool(new CachePoolInfo(directive.getPool()));
+      expectedUpdateCount++;
       id = client.addCacheDirective(directive, EnumSet.of(CacheFlag.FORCE));
     }
 
     @Override
     void invoke() throws Exception {
+      expectedUpdateCount++;
       client.modifyCacheDirective(
           new CacheDirectiveInfo.Builder().
               setId(id).
@@ -874,12 +887,15 @@ public class TestRetryCacheWithHA {
 
     @Override
     void prepare() throws Exception {
+      expectedUpdateCount++;
       dfs.addCachePool(new CachePoolInfo(directive.getPool()));
+      expectedUpdateCount++;
       id = dfs.addCacheDirective(directive, EnumSet.of(CacheFlag.FORCE));
     }
 
     @Override
     void invoke() throws Exception {
+      expectedUpdateCount++;
       client.removeCacheDirective(id);
     }
 
@@ -921,6 +937,7 @@ public class TestRetryCacheWithHA {
 
     @Override
     void invoke() throws Exception {
+      expectedUpdateCount++;
       client.addCachePool(new CachePoolInfo(pool));
     }
 
@@ -953,11 +970,13 @@ public class TestRetryCacheWithHA {
 
     @Override
     void prepare() throws Exception {
+      expectedUpdateCount++;
       client.addCachePool(new CachePoolInfo(pool).setLimit(10l));
     }
 
     @Override
     void invoke() throws Exception {
+      expectedUpdateCount++;
       client.modifyCachePool(new CachePoolInfo(pool).setLimit(99l));
     }
 
@@ -990,11 +1009,13 @@ public class TestRetryCacheWithHA {
 
     @Override
     void prepare() throws Exception {
+      expectedUpdateCount++;
       client.addCachePool(new CachePoolInfo(pool));
     }
 
     @Override
     void invoke() throws Exception {
+      expectedUpdateCount++;
       client.removeCachePool(pool);
     }
 
@@ -1029,12 +1050,14 @@ public class TestRetryCacheWithHA {
     void prepare() throws Exception {
       Path p = new Path(src);
       if (!dfs.exists(p)) {
+        expectedUpdateCount++;
         DFSTestUtil.createFile(dfs, p, BlockSize, DataNodes, 0);
       }
     }
 
     @Override
     void invoke() throws Exception {
+      expectedUpdateCount++;
       client.setXAttr(src, "user.key", "value".getBytes(),
           EnumSet.of(XAttrSetFlag.CREATE));
     }
@@ -1071,7 +1094,9 @@ public class TestRetryCacheWithHA {
     void prepare() throws Exception {
       Path p = new Path(src);
       if (!dfs.exists(p)) {
+        expectedUpdateCount++;
         DFSTestUtil.createFile(dfs, p, BlockSize, DataNodes, 0);
+        expectedUpdateCount++;
         client.setXAttr(src, "user.key", "value".getBytes(),
           EnumSet.of(XAttrSetFlag.CREATE));
       }
@@ -1079,6 +1104,7 @@ public class TestRetryCacheWithHA {
 
     @Override
     void invoke() throws Exception {
+      expectedUpdateCount++;
       client.removeXAttr(src, "user.key");
     }
 
@@ -1315,6 +1341,13 @@ public class TestRetryCacheWithHA {
     assertTrue("CacheUpdated on NN0: " + updatedNN0, updatedNN0 > 0);
     // Cache updated metrics on NN0 should be >0 since NN1 applied the editlog
     assertTrue("CacheUpdated on NN1: " + updatedNN1, updatedNN1 > 0);
+    long expectedUpdateCount = op.getExpectedCacheUpdateCount();
+    if (expectedUpdateCount > 0) {
+      assertEquals("CacheUpdated on NN0: " + updatedNN0, expectedUpdateCount,
+          updatedNN0);
+      assertEquals("CacheUpdated on NN0: " + updatedNN1, expectedUpdateCount,
+          updatedNN1);
+    }
   }
 
   /**