Jelajahi Sumber

Merge r1518851 from trunk (reconcile branch conflicts with the fix for HDFS-5077)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-2832@1519882 13f79535-47bb-0310-9956-ffa450edef68
Arpit Agarwal 12 tahun lalu
induk
melakukan
ec57e3019f

+ 3 - 0
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt

@@ -401,6 +401,9 @@ Release 2.1.1-beta - UNRELEASED
     HDFS-5132. Deadlock in NameNode between SafeModeMonitor#run and 
     DatanodeManager#handleHeartbeat. (kihwal)
 
+    HDFS-5077. NPE in FSNamesystem.commitBlockSynchronization().
+    (Plamen Jeliazkov via shv)
+
 Release 2.1.0-beta - 2013-08-22
 
   INCOMPATIBLE CHANGES

+ 29 - 8
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java

@@ -3326,7 +3326,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
   boolean isFileClosed(String src) 
       throws AccessControlException, UnresolvedLinkException,
       StandbyException, IOException {
-    FSPermissionChecker pc = getPermissionChecker();	
+    FSPermissionChecker pc = getPermissionChecker();  
     checkOperation(OperationCategory.READ);
     readLock();
     try {
@@ -3784,18 +3784,39 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
         // find the DatanodeDescriptor objects
         // There should be no locations in the blockManager till now because the
         // file is underConstruction
-        final DatanodeStorageInfo[] storages = blockManager.getDatanodeManager()
-            .getDatanodeStorageInfos(newtargets, newtargetstorages);
-        if (closeFile && storages != null) {
+        ArrayList<DatanodeDescriptor> trimmedTargets =
+            new ArrayList<DatanodeDescriptor>(newtargets.length);
+        ArrayList<String> trimmedStorages =
+            new ArrayList<String>(newtargets.length);
+        if (newtargets.length > 0) {
+          for (int i = 0; i < newtargets.length; ++i) {
+            // try to get targetNode
+            DatanodeDescriptor targetNode =
+                blockManager.getDatanodeManager().getDatanode(newtargets[i]);
+            if (targetNode != null) {
+              trimmedTargets.add(targetNode);
+              trimmedStorages.add(newtargetstorages[i]);
+            } else if (LOG.isDebugEnabled()) {
+              LOG.debug("DatanodeDescriptor (=" + newtargets[i] + ") not found");
+            }
+          }
+        }
+        if ((closeFile) && !trimmedTargets.isEmpty()) {
           // the file is getting closed. Insert block locations into blockManager.
           // Otherwise fsck will report these blocks as MISSING, especially if the
           // blocksReceived from Datanodes take a long time to arrive.
-          for (int i = 0; i < storages.length; i++) {
-            storages[i].addBlock(storedBlock);
+          for (int i = 0; i < trimmedTargets.size(); i++) {
+            trimmedTargets.get(i).addBlock(
+              trimmedStorages.get(i), storedBlock);
           }
         }
+
         // add pipeline locations into the INodeUnderConstruction
-        pendingFile.setLastBlock(storedBlock, storages);
+        DatanodeStorageInfo[] trimmedStorageInfos =
+            blockManager.getDatanodeManager().getDatanodeStorageInfos(
+                trimmedTargets.toArray(new DatanodeID[trimmedTargets.size()]),
+                trimmedStorages.toArray(new String[trimmedStorages.size()]));
+        pendingFile.setLastBlock(storedBlock, trimmedStorageInfos);
       }
 
       if (closeFile) {
@@ -5823,7 +5844,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
    * @throws IOException
    */
   Collection<CorruptFileBlockInfo> listCorruptFileBlocks(String path,
-	String[] cookieTab) throws IOException {
+  String[] cookieTab) throws IOException {
     checkSuperuserPrivilege();
     checkOperation(OperationCategory.READ);
     readLock();

+ 19 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCommitBlockSynchronization.java

@@ -169,4 +169,23 @@ public class TestCommitBlockSynchronization {
     namesystemSpy.commitBlockSynchronization(
         lastBlock, genStamp, length, true, false, newTargets, null);
   }
+
+  @Test
+  public void testCommitBlockSynchronizationWithCloseAndNonExistantTarget()
+      throws IOException {
+    INodeFileUnderConstruction file = mock(INodeFileUnderConstruction.class);
+    Block block = new Block(blockId, length, genStamp);
+    FSNamesystem namesystemSpy = makeNameSystemSpy(block, file);
+    DatanodeID[] newTargets = new DatanodeID[]{
+        new DatanodeID("0.0.0.0", "nonexistantHost", "1", 0, 0, 0)};
+
+    ExtendedBlock lastBlock = new ExtendedBlock();
+    namesystemSpy.commitBlockSynchronization(
+        lastBlock, genStamp, length, true,
+        false, newTargets, null);
+
+    // Repeat the call to make sure it returns true
+    namesystemSpy.commitBlockSynchronization(
+        lastBlock, genStamp, length, true, false, newTargets, null);
+  }
 }