Browse Source

HDFS-6481. DatanodeManager#getDatanodeStorageInfos() should check the length of storageIDs. (Contributed by szetszwo)

Arpit Agarwal 9 năm trước cách đây
mục cha
commit
33908c6f52

+ 3 - 0
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt

@@ -1503,6 +1503,9 @@ Release 2.7.2 - UNRELEASED
     HDFS-9317. Document fsck -blockId and -storagepolicy options in branch-2.7.
     (aajisaka)
 
+    HDFS-6481. DatanodeManager#getDatanodeStorageInfos() should check the
+    length of storageIDs. (szetszwo via Arpit Agarwal)
+
 Release 2.7.1 - 2015-07-06
 
   INCOMPATIBLE CHANGES

+ 12 - 2
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java

@@ -505,8 +505,18 @@ public class DatanodeManager {
   }
 
   public DatanodeStorageInfo[] getDatanodeStorageInfos(
-      DatanodeID[] datanodeID, String[] storageIDs)
-          throws UnregisteredNodeException {
+      DatanodeID[] datanodeID, String[] storageIDs,
+      String format, Object... args) throws UnregisteredNodeException {
+    if (datanodeID.length != storageIDs.length) {
+      final String err = (storageIDs.length == 0?
+          "Missing storageIDs: It is likely that the HDFS client,"
+          + " who made this call, is running in an older version of Hadoop"
+          + " which does not support storageIDs."
+          : "Length mismatched: storageIDs.length=" + storageIDs.length + " != "
+          ) + " datanodeID.length=" + datanodeID.length;
+      throw new HadoopIllegalArgumentException(
+          err + ", "+ String.format(format, args));
+    }
     if (datanodeID.length == 0) {
       return null;
     }

+ 19 - 17
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java

@@ -2489,7 +2489,9 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
 
       //find datanode storages
       final DatanodeManager dm = blockManager.getDatanodeManager();
-      chosen = Arrays.asList(dm.getDatanodeStorageInfos(existings, storageIDs));
+      chosen = Arrays.asList(dm.getDatanodeStorageInfos(existings, storageIDs,
+          "src=%s, fileId=%d, blk=%s, clientName=%s, clientMachine=%s",
+          src, fileId, blk, clientName, clientMachine));
     } finally {
       readUnlock();
     }
@@ -3264,7 +3266,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
              + ", deleteBlock=" + deleteblock
              + ")");
     checkOperation(OperationCategory.WRITE);
-    String src = "";
+    final String src;
     waitForLoadingFSImage();
     writeLock();
     boolean copyTruncate = false;
@@ -3311,10 +3313,10 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
       }
       long bcId = storedBlock.getBlockCollectionId();
       INodeFile iFile = ((INode)getBlockCollection(bcId)).asFile();
+      src = iFile.getFullPathName();
       if (isFileDeleted(iFile)) {
         throw new FileNotFoundException("File not found: "
-            + iFile.getFullPathName() + ", likely due to delayed block"
-            + " removal");
+            + src + ", likely due to delayed block removal");
       }
       if ((!iFile.isUnderConstruction() || storedBlock.isComplete()) &&
           iFile.getLastBlock().isComplete()) {
@@ -3389,7 +3391,10 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
         DatanodeStorageInfo[] trimmedStorageInfos =
             blockManager.getDatanodeManager().getDatanodeStorageInfos(
                 trimmedTargets.toArray(new DatanodeID[trimmedTargets.size()]),
-                trimmedStorages.toArray(new String[trimmedStorages.size()]));
+                trimmedStorages.toArray(new String[trimmedStorages.size()]),
+                "src=%s, oldBlock=%s, newgenerationstamp=%d, newlength=%d",
+                src, oldBlock, newgenerationstamp, newlength);
+
         if(copyTruncate) {
           iFile.convertLastBlockToUC(truncatedBlock, trimmedStorageInfos);
         } else {
@@ -3403,16 +3408,15 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
 
       if (closeFile) {
         if(copyTruncate) {
-          src = closeFileCommitBlocks(iFile, truncatedBlock);
+          closeFileCommitBlocks(src, iFile, truncatedBlock);
           if(!iFile.isBlockInLatestSnapshot(storedBlock)) {
             blockManager.removeBlock(storedBlock);
           }
         } else {
-          src = closeFileCommitBlocks(iFile, storedBlock);
+          closeFileCommitBlocks(src, iFile, storedBlock);
         }
       } else {
         // If this commit does not want to close the file, persist blocks
-        src = iFile.getFullPathName();
         FSDirWriteFileOp.persistBlocks(dir, src, iFile, false);
       }
     } finally {
@@ -3438,20 +3442,16 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
    * @throws IOException on error
    */
   @VisibleForTesting
-  String closeFileCommitBlocks(INodeFile pendingFile, BlockInfo storedBlock)
-      throws IOException {
+  void closeFileCommitBlocks(String src, INodeFile pendingFile,
+      BlockInfo storedBlock) throws IOException {
     final INodesInPath iip = INodesInPath.fromINode(pendingFile);
-    final String src = iip.getPath();
 
     // commit the last block and complete it if it has minimum replicas
     commitOrCompleteLastBlock(pendingFile, iip, storedBlock);
 
     //remove lease, close file
     finalizeINodeFileUnderConstruction(src, pendingFile,
-                                       Snapshot.findLatestSnapshot(pendingFile,
-                                                                   Snapshot.CURRENT_STATE_ID));
-
-    return src;
+        Snapshot.findLatestSnapshot(pendingFile, Snapshot.CURRENT_STATE_ID));
   }
 
   /**
@@ -5405,6 +5405,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     assert hasWriteLock();
     // check the vadility of the block and lease holder name
     final INodeFile pendingFile = checkUCBlock(oldBlock, clientName);
+    final String src = pendingFile.getFullPathName();
     final BlockInfo blockinfo = pendingFile.getLastBlock();
     assert !blockinfo.isComplete();
 
@@ -5424,11 +5425,12 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
 
     // find the DatanodeDescriptor objects
     final DatanodeStorageInfo[] storages = blockManager.getDatanodeManager()
-        .getDatanodeStorageInfos(newNodes, newStorageIDs);
+        .getDatanodeStorageInfos(newNodes, newStorageIDs,
+            "src=%s, oldBlock=%s, newBlock=%s, clientName=%s",
+            src, oldBlock, newBlock, clientName);
     blockinfo.getUnderConstructionFeature().setExpectedLocations(
         blockinfo, storages);
 
-    String src = pendingFile.getFullPathName();
     FSDirWriteFileOp.persistBlocks(dir, src, pendingFile, logRetryCache);
   }
 

+ 2 - 2
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCommitBlockSynchronization.java

@@ -80,8 +80,8 @@ public class TestCommitBlockSynchronization {
 
     doReturn(blockInfo).when(namesystemSpy).getStoredBlock(any(Block.class));
     doReturn(blockInfo).when(file).getLastBlock();
-    doReturn("").when(namesystemSpy).closeFileCommitBlocks(
-        any(INodeFile.class), any(BlockInfo.class));
+    doNothing().when(namesystemSpy).closeFileCommitBlocks(
+        any(String.class), any(INodeFile.class), any(BlockInfo.class));
     doReturn(mock(FSEditLog.class)).when(namesystemSpy).getEditLog();
 
     return namesystemSpy;