Prechádzať zdrojové kódy

HADOOP-5124. A few optimizations to FsNamesystem#RecentInvalidateSets. Contributed by Hairong Kuang.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.20-security@1199463 13f79535-47bb-0310-9956-ffa450edef68
Jitendra Nath Pandey 13 rokov pred
rodič
commit
6b65576594

+ 3 - 0
CHANGES.txt

@@ -85,6 +85,9 @@ Release 0.20.205.1 - unreleased
     HADOOP-6886. LocalFileSystem Needs createNonRecursive API.
     (Nicolas Spiegelberg via jitendra)
 
+    HADOOP-5124. A few optimizations to FsNamesystem#RecentInvalidateSets.
+    (Hairong Kuang via jitendra)
+
   BUG FIXES
 
     HADOOP-7740. Fixed security audit logger configuration. 

+ 48 - 28
src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java

@@ -32,6 +32,7 @@ import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.Date;
 import java.util.EnumSet;
 import java.util.Formatter;
@@ -345,7 +346,7 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean,
   private final GenerationStamp generationStamp = new GenerationStamp();
 
   // Ask Datanode only up to this many blocks to delete.
-  private int blockInvalidateLimit = DFSConfigKeys.DFS_BLOCK_INVALIDATE_LIMIT_DEFAULT;
+  int blockInvalidateLimit = DFSConfigKeys.DFS_BLOCK_INVALIDATE_LIMIT_DEFAULT;
 
   // precision of access times.
   private long accessTimePrecision = 0;
@@ -1466,15 +1467,14 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean,
           // remove this block from the list of pending blocks to be deleted. 
           // This reduces the possibility of triggering HADOOP-1349.
           //
-          for (Iterator<Collection<Block>> iter = recentInvalidateSets.values().iterator();
-               iter.hasNext();
-               ) {
-            Collection<Block> v = iter.next();
-            if (v.remove(last)) {
-              pendingDeletionBlocksCount--;
+          for (DatanodeDescriptor dd : targets) {
+            String datanodeId = dd.getStorageID();
+            Collection<Block> v = recentInvalidateSets.get(datanodeId);
+            if (v != null && v.remove(last)) {
               if (v.isEmpty()) {
-                iter.remove();
+                recentInvalidateSets.remove(datanodeId);
               }
+              pendingDeletionBlocksCount--;
             }
           }
         }
@@ -1793,7 +1793,7 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean,
    * @param b block
    * @param n datanode
    */
-  private void addToInvalidatesNoLog(Block b, DatanodeInfo n) {
+  void addToInvalidatesNoLog(Block b, DatanodeInfo n) {
     Collection<Block> invalidateSet = recentInvalidateSets.get(n.getStorageID());
     if (invalidateSet == null) {
       invalidateSet = new HashSet<Block>();
@@ -2831,13 +2831,37 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean,
     return replicationWorkFound + invalidationWorkFound;
   }
 
-  private int computeInvalidateWork(int nodesToProcess) {
+  /**
+   * Schedule blocks for deletion at datanodes
+   * @param nodesToProcess number of datanodes to schedule deletion work
+   * @return total number of block for deletion
+   */
+  int computeInvalidateWork(int nodesToProcess) {
+    int numOfNodes = recentInvalidateSets.size();
+    nodesToProcess = Math.min(numOfNodes, nodesToProcess);
+    
+    // get an array of the keys
+    ArrayList<String> keyArray =
+      new ArrayList<String>(recentInvalidateSets.keySet());
+
+    // randomly pick up <i>nodesToProcess</i> nodes 
+    // and put them at [0, nodesToProcess)
+    int remainingNodes = numOfNodes - nodesToProcess;
+    if (nodesToProcess < remainingNodes) {
+      for(int i=0; i<nodesToProcess; i++) {
+        int keyIndex = r.nextInt(numOfNodes-i)+i;
+        Collections.swap(keyArray, keyIndex, i); // swap to front
+      }
+    } else {
+      for(int i=0; i<remainingNodes; i++) {
+        int keyIndex = r.nextInt(numOfNodes-i);
+        Collections.swap(keyArray, keyIndex, numOfNodes-i-1); // swap to end
+      }
+    }
+    
     int blockCnt = 0;
     for(int nodeCnt = 0; nodeCnt < nodesToProcess; nodeCnt++ ) {
-      int work = invalidateWorkForOneNode();
-      if(work == 0)
-        break;
-      blockCnt += work;
+      blockCnt += invalidateWorkForOneNode(keyArray.get(nodeCnt));
     }
     return blockCnt;
   }
@@ -3123,28 +3147,24 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean,
   }
 
   /**
-   * Get blocks to invalidate for the first node 
+   * Get blocks to invalidate for <i>nodeId</i> 
    * in {@link #recentInvalidateSets}.
    * 
    * @return number of blocks scheduled for removal during this iteration.
    */
-  private synchronized int invalidateWorkForOneNode() {
+  private synchronized int invalidateWorkForOneNode(String nodeId) {
     // blocks should not be replicated or removed if safe mode is on
     if (isInSafeMode())
       return 0;
-    if(recentInvalidateSets.isEmpty())
-      return 0;
-    // get blocks to invalidate for the first node
-    String firstNodeId = recentInvalidateSets.keySet().iterator().next();
-    assert firstNodeId != null;
-    DatanodeDescriptor dn = datanodeMap.get(firstNodeId);
+    // get blocks to invalidate for the nodeId
+    assert nodeId != null;
+    DatanodeDescriptor dn = datanodeMap.get(nodeId);
     if (dn == null) {
-       removeFromInvalidates(firstNodeId);
-       return 0;
+      recentInvalidateSets.remove(nodeId);
+      return 0;
     }
-
-    Collection<Block> invalidateSet = recentInvalidateSets.get(firstNodeId);
-    if(invalidateSet == null)
+    Collection<Block> invalidateSet = recentInvalidateSets.get(nodeId);
+    if (invalidateSet == null)
       return 0;
 
     ArrayList<Block> blocksToInvalidate = 
@@ -3160,7 +3180,7 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean,
 
     // If we send everything in this message, remove this node entry
     if (!it.hasNext()) {
-      removeFromInvalidates(firstNodeId);
+      recentInvalidateSets.remove(nodeId);
     }
 
     dn.addBlocksToBeInvalidated(blocksToInvalidate);

+ 57 - 0
src/test/org/apache/hadoop/hdfs/server/namenode/TestComputeInvalidateWork.java

@@ -0,0 +1,57 @@
+package org.apache.hadoop.hdfs.server.namenode;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.server.common.GenerationStamp;
+
+import junit.framework.TestCase;
+
+/**
+ * Test if FSNamesystem handles heartbeat right
+ */
+public class TestComputeInvalidateWork extends TestCase {
+  /**
+   * Test if {@link FSNamesystem#computeInvalidateWork(int)}
+   * can schedule invalidate work correctly 
+   */
+  public void testCompInvalidate() throws Exception {
+    final Configuration conf = new Configuration();
+    final int NUM_OF_DATANODES = 3;
+    final MiniDFSCluster cluster = new MiniDFSCluster(conf, NUM_OF_DATANODES, true, null);
+    try {
+      cluster.waitActive();
+      final FSNamesystem namesystem = cluster.getNameNode().getNamesystem();
+      DatanodeDescriptor[] nodes =
+        namesystem.heartbeats.toArray(new DatanodeDescriptor[NUM_OF_DATANODES]);
+      assertEquals(nodes.length, NUM_OF_DATANODES);
+      
+      synchronized (namesystem) {
+      for (int i=0; i<nodes.length; i++) {
+        for(int j=0; j<3*namesystem.blockInvalidateLimit+1; j++) {
+          Block block = new Block(i*(namesystem.blockInvalidateLimit+1)+j, 0, 
+              GenerationStamp.FIRST_VALID_STAMP);
+          namesystem.addToInvalidatesNoLog(block, nodes[i]);
+        }
+      }
+      
+      assertEquals(namesystem.blockInvalidateLimit*NUM_OF_DATANODES, 
+          namesystem.computeInvalidateWork(NUM_OF_DATANODES+1));
+      assertEquals(namesystem.blockInvalidateLimit*NUM_OF_DATANODES, 
+          namesystem.computeInvalidateWork(NUM_OF_DATANODES));
+      assertEquals(namesystem.blockInvalidateLimit*(NUM_OF_DATANODES-1), 
+          namesystem.computeInvalidateWork(NUM_OF_DATANODES-1));
+      int workCount = namesystem.computeInvalidateWork(1);
+      if (workCount == 1) {
+        assertEquals(namesystem.blockInvalidateLimit+1, 
+            namesystem.computeInvalidateWork(2));        
+      } else {
+        assertEquals(workCount, namesystem.blockInvalidateLimit);
+        assertEquals(2, namesystem.computeInvalidateWork(2));
+      }
+      }
+    } finally {
+      cluster.shutdown();
+    }
+  }
+}