Ver Fonte

BUG-21457: [HDFS-6599] 2.4 addBlock is 10 to 20 times slower compared to 0.23

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1611737 13f79535-47bb-0310-9956-ffa450edef68

Conflicts:
	hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
Daryn Sharp há 11 anos atrás
pai
commit
1427ecd385

+ 3 - 0
HDP-CHANGES.txt

@@ -134,3 +134,6 @@ The changes also include jiras that are yet to become available in later apache
 
 
     MAPREDUCE-6032. Made MR jobs write job history files on the default FS when
     MAPREDUCE-6032. Made MR jobs write job history files on the default FS when
     the current context's FS is different. (Benjamin Zhitomirsky via zjshen)
     the current context's FS is different. (Benjamin Zhitomirsky via zjshen)
+
+    HDFS-6599. 2.4 addBlock is 10 to 20 times slower compared to 0.23. (daryn)
+

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeInfo.java

@@ -336,7 +336,7 @@ public class DatanodeInfo extends DatanodeID implements Node {
     buffer.append("Cache Remaining: " +cr+ " ("+StringUtils.byteDesc(cr)+")"+"\n");
     buffer.append("Cache Remaining: " +cr+ " ("+StringUtils.byteDesc(cr)+")"+"\n");
     buffer.append("Cache Used%: "+percent2String(cacheUsedPercent) + "\n");
     buffer.append("Cache Used%: "+percent2String(cacheUsedPercent) + "\n");
     buffer.append("Cache Remaining%: "+percent2String(cacheRemainingPercent) + "\n");
     buffer.append("Cache Remaining%: "+percent2String(cacheRemainingPercent) + "\n");
-
+    buffer.append("Xceivers: "+getXceiverCount()+"\n");
     buffer.append("Last contact: "+new Date(lastUpdate)+"\n");
     buffer.append("Last contact: "+new Date(lastUpdate)+"\n");
     return buffer.toString();
     return buffer.toString();
   }
   }

+ 5 - 9
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java

@@ -636,15 +636,11 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
 
 
     // check the communication traffic of the target machine
     // check the communication traffic of the target machine
     if (considerLoad) {
     if (considerLoad) {
-      double avgLoad = 0;
-      if (stats != null) {
-        int size = stats.getNumDatanodesInService();
-        if (size != 0) {
-          avgLoad = (double)stats.getTotalLoad()/size;
-        }
-      }
-      if (node.getXceiverCount() > (2.0 * avgLoad)) {
-        logNodeIsNotChosen(storage, "the node is too busy ");
+      final double maxLoad = 2.0 * stats.getInServiceXceiverAverage();
+      final int nodeLoad = node.getXceiverCount();
+      if (nodeLoad > maxLoad) {
+        logNodeIsNotChosen(storage,
+            "the node is too busy (load:"+nodeLoad+" > "+maxLoad+") ");
         return false;
         return false;
       }
       }
     }
     }

+ 3 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java

@@ -804,7 +804,9 @@ public class DatanodeManager {
   }
   }
 
 
   /** Start decommissioning the specified datanode. */
   /** Start decommissioning the specified datanode. */
-  private void startDecommission(DatanodeDescriptor node) {
+  @InterfaceAudience.Private
+  @VisibleForTesting
+  public void startDecommission(DatanodeDescriptor node) {
     if (!node.isDecommissionInProgress() && !node.isDecommissioned()) {
     if (!node.isDecommissionInProgress() && !node.isDecommissioned()) {
       for (DatanodeStorageInfo storage : node.getStorageInfos()) {
       for (DatanodeStorageInfo storage : node.getStorageInfos()) {
         LOG.info("Start Decommissioning " + node + " " + storage
         LOG.info("Start Decommissioning " + node + " " + storage

+ 6 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStatistics.java

@@ -52,6 +52,12 @@ public interface DatanodeStatistics {
   /** @return the xceiver count */
   /** @return the xceiver count */
   public int getXceiverCount();
   public int getXceiverCount();
 
 
+  /** @return average xceiver count for non-decommission(ing|ed) nodes */
+  public int getInServiceXceiverCount();
+  
+  /** @return number of non-decommission(ing|ed) nodes */
+  public int getNumDatanodesInService();
+  
   /**
   /**
    * @return the total used space by data nodes for non-DFS purposes
    * @return the total used space by data nodes for non-DFS purposes
    * such as storing temporary files on the local file system
    * such as storing temporary files on the local file system

+ 20 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java

@@ -150,6 +150,16 @@ class HeartbeatManager implements DatanodeStatistics {
     return stats.xceiverCount;
     return stats.xceiverCount;
   }
   }
   
   
+  @Override
+  public synchronized int getInServiceXceiverCount() {
+    return stats.nodesInServiceXceiverCount;
+  }
+  
+  @Override
+  public synchronized int getNumDatanodesInService() {
+    return stats.nodesInService;
+  }
+  
   @Override
   @Override
   public synchronized long getCacheCapacity() {
   public synchronized long getCacheCapacity() {
     return stats.cacheCapacity;
     return stats.cacheCapacity;
@@ -178,7 +188,7 @@ class HeartbeatManager implements DatanodeStatistics {
   }
   }
 
 
   synchronized void register(final DatanodeDescriptor d) {
   synchronized void register(final DatanodeDescriptor d) {
-    if (!datanodes.contains(d)) {
+    if (!d.isAlive) {
       addDatanode(d);
       addDatanode(d);
 
 
       //update its timestamp
       //update its timestamp
@@ -191,6 +201,8 @@ class HeartbeatManager implements DatanodeStatistics {
   }
   }
 
 
   synchronized void addDatanode(final DatanodeDescriptor d) {
   synchronized void addDatanode(final DatanodeDescriptor d) {
+    // update in-service node count
+    stats.add(d);
     datanodes.add(d);
     datanodes.add(d);
     d.isAlive = true;
     d.isAlive = true;
   }
   }
@@ -323,6 +335,9 @@ class HeartbeatManager implements DatanodeStatistics {
     private long cacheCapacity = 0L;
     private long cacheCapacity = 0L;
     private long cacheUsed = 0L;
     private long cacheUsed = 0L;
 
 
+    private int nodesInService = 0;
+    private int nodesInServiceXceiverCount = 0;
+
     private int expiredHeartbeats = 0;
     private int expiredHeartbeats = 0;
 
 
     private void add(final DatanodeDescriptor node) {
     private void add(final DatanodeDescriptor node) {
@@ -330,6 +345,8 @@ class HeartbeatManager implements DatanodeStatistics {
       blockPoolUsed += node.getBlockPoolUsed();
       blockPoolUsed += node.getBlockPoolUsed();
       xceiverCount += node.getXceiverCount();
       xceiverCount += node.getXceiverCount();
       if (!(node.isDecommissionInProgress() || node.isDecommissioned())) {
       if (!(node.isDecommissionInProgress() || node.isDecommissioned())) {
+        nodesInService++;
+        nodesInServiceXceiverCount += node.getXceiverCount();
         capacityTotal += node.getCapacity();
         capacityTotal += node.getCapacity();
         capacityRemaining += node.getRemaining();
         capacityRemaining += node.getRemaining();
       } else {
       } else {
@@ -344,6 +361,8 @@ class HeartbeatManager implements DatanodeStatistics {
       blockPoolUsed -= node.getBlockPoolUsed();
       blockPoolUsed -= node.getBlockPoolUsed();
       xceiverCount -= node.getXceiverCount();
       xceiverCount -= node.getXceiverCount();
       if (!(node.isDecommissionInProgress() || node.isDecommissioned())) {
       if (!(node.isDecommissionInProgress() || node.isDecommissioned())) {
+        nodesInService--;
+        nodesInServiceXceiverCount -= node.getXceiverCount();
         capacityTotal -= node.getCapacity();
         capacityTotal -= node.getCapacity();
         capacityRemaining -= node.getRemaining();
         capacityRemaining -= node.getRemaining();
       } else {
       } else {

+ 9 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSClusterStats.java

@@ -48,6 +48,15 @@ public interface FSClusterStats {
    * @return Number of datanodes that are both alive and not decommissioned.
    * @return Number of datanodes that are both alive and not decommissioned.
    */
    */
   public int getNumDatanodesInService();
   public int getNumDatanodesInService();
+
+  /**
+   * an indication of the average load of non-decommission(ing|ed) nodes
+   * eligible for block placement
+   * 
+   * @return average of the in service number of block transfers and block
+   *         writes that are currently occurring on the cluster.
+   */
+  public double getInServiceXceiverAverage();
 }
 }
     
     
     
     

+ 12 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java

@@ -6882,7 +6882,18 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
 
 
   @Override // FSClusterStats
   @Override // FSClusterStats
   public int getNumDatanodesInService() {
   public int getNumDatanodesInService() {
-    return getNumLiveDataNodes() - getNumDecomLiveDataNodes();
+    return datanodeStatistics.getNumDatanodesInService();
+  }
+  
+  @Override // for block placement strategy
+  public double getInServiceXceiverAverage() {
+    double avgLoad = 0;
+    final int nodes = getNumDatanodesInService();
+    if (nodes != 0) {
+      final int xceivers = datanodeStatistics.getInServiceXceiverCount();
+      avgLoad = (double)xceivers/nodes;
+    }
+    return avgLoad;
   }
   }
 
 
   public SnapshotManager getSnapshotManager() {
   public SnapshotManager getSnapshotManager() {

+ 16 - 8
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyConsiderLoad.java

@@ -40,6 +40,7 @@ import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NodeType;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NodeType;
 import org.apache.hadoop.hdfs.server.common.StorageInfo;
 import org.apache.hadoop.hdfs.server.common.StorageInfo;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NodeType;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NodeType;
+import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
 import org.apache.hadoop.test.PathUtils;
 import org.apache.hadoop.test.PathUtils;
@@ -102,6 +103,7 @@ public class TestReplicationPolicyConsiderLoad {
     }
     }
   }
   }
 
 
+  private final double EPSILON = 0.0001;
   /**
   /**
    * Tests that chooseTarget with considerLoad set to true correctly calculates
    * Tests that chooseTarget with considerLoad set to true correctly calculates
    * load with decommissioned nodes.
    * load with decommissioned nodes.
@@ -110,14 +112,6 @@ public class TestReplicationPolicyConsiderLoad {
   public void testChooseTargetWithDecomNodes() throws IOException {
   public void testChooseTargetWithDecomNodes() throws IOException {
     namenode.getNamesystem().writeLock();
     namenode.getNamesystem().writeLock();
     try {
     try {
-      // Decommission DNs so BlockPlacementPolicyDefault.isGoodTarget()
-      // returns false
-      for (int i = 0; i < 3; i++) {
-        DatanodeInfo d = dnManager.getDatanodeByXferAddr(
-            dnrList.get(i).getIpAddr(),
-            dnrList.get(i).getXferPort());
-        d.setDecommissioned();
-      }
       String blockPoolId = namenode.getNamesystem().getBlockPoolId();
       String blockPoolId = namenode.getNamesystem().getBlockPoolId();
       dnManager.handleHeartbeat(dnrList.get(3),
       dnManager.handleHeartbeat(dnrList.get(3),
           BlockManagerTestUtil.getStorageReportsForDatanode(dataNodes[3]),
           BlockManagerTestUtil.getStorageReportsForDatanode(dataNodes[3]),
@@ -134,6 +128,20 @@ public class TestReplicationPolicyConsiderLoad {
           blockPoolId, dataNodes[5].getCacheCapacity(),
           blockPoolId, dataNodes[5].getCacheCapacity(),
           dataNodes[5].getCacheRemaining(),
           dataNodes[5].getCacheRemaining(),
           4, 0, 0);
           4, 0, 0);
+      // value in the above heartbeats
+      final int load = 2 + 4 + 4;
+      
+      FSNamesystem fsn = namenode.getNamesystem();
+      assertEquals((double)load/6, fsn.getInServiceXceiverAverage(), EPSILON);
+      
+      // Decommission DNs so BlockPlacementPolicyDefault.isGoodTarget()
+      // returns false
+      for (int i = 0; i < 3; i++) {
+        DatanodeDescriptor d = dnManager.getDatanode(dnrList.get(i));
+        dnManager.startDecommission(d);
+        d.setDecommissioned();
+      }
+      assertEquals((double)load/3, fsn.getInServiceXceiverAverage(), EPSILON);
 
 
       // Call chooseTarget()
       // Call chooseTarget()
       DatanodeStorageInfo[] targets = namenode.getNamesystem().getBlockManager()
       DatanodeStorageInfo[] targets = namenode.getNamesystem().getBlockManager()

+ 185 - 1
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNamenodeCapacityReport.java

@@ -18,9 +18,11 @@
 package org.apache.hadoop.hdfs.server.namenode;
 package org.apache.hadoop.hdfs.server.namenode;
 
 
 
 
-import static org.junit.Assert.assertTrue;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_RETRIES_KEY;
+import static org.junit.Assert.*;
 
 
 import java.io.File;
 import java.io.File;
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.List;
 
 
@@ -28,12 +30,21 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.DF;
 import org.apache.hadoop.fs.DF;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DFSOutputStream;
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager;
 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
 import org.junit.Test;
 import org.junit.Test;
 
 
 
 
@@ -153,4 +164,177 @@ public class TestNamenodeCapacityReport {
       if (cluster != null) {cluster.shutdown();}
       if (cluster != null) {cluster.shutdown();}
     }
     }
   }
   }
+  
+  private static final float EPSILON = 0.0001f;
+  @Test
+  public void testXceiverCount() throws Exception {
+    Configuration conf = new HdfsConfiguration();
+    // don't waste time retrying if close fails
+    conf.setInt(DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_RETRIES_KEY, 0);
+    MiniDFSCluster cluster = null;
+
+    final int nodes = 8;
+    final int fileCount = 5;
+    final short fileRepl = 3;
+
+    try {
+      cluster = new MiniDFSCluster.Builder(conf).numDataNodes(nodes).build();
+      cluster.waitActive();
+
+      final FSNamesystem namesystem = cluster.getNamesystem();
+      final DatanodeManager dnm = namesystem.getBlockManager().getDatanodeManager();
+      List<DataNode> datanodes = cluster.getDataNodes();
+      final DistributedFileSystem fs = cluster.getFileSystem();
+
+      // trigger heartbeats in case not already sent
+      triggerHeartbeats(datanodes);
+      
+      // check that all nodes are live and in service
+      int expectedTotalLoad = nodes;  // xceiver server adds 1 to load
+      int expectedInServiceNodes = nodes;
+      int expectedInServiceLoad = nodes;
+      assertEquals(nodes, namesystem.getNumLiveDataNodes());
+      assertEquals(expectedInServiceNodes, namesystem.getNumDatanodesInService());
+      assertEquals(expectedTotalLoad, namesystem.getTotalLoad());
+      assertEquals((double)expectedInServiceLoad/expectedInServiceLoad,
+          namesystem.getInServiceXceiverAverage(), EPSILON);
+      
+      // shutdown half the nodes and force a heartbeat check to ensure
+      // counts are accurate
+      for (int i=0; i < nodes/2; i++) {
+        DataNode dn = datanodes.get(i);
+        DatanodeDescriptor dnd = dnm.getDatanode(dn.getDatanodeId());
+        dn.shutdown();
+        dnd.setLastUpdate(0L);
+        BlockManagerTestUtil.checkHeartbeat(namesystem.getBlockManager());
+        expectedInServiceNodes--;
+        assertEquals(expectedInServiceNodes, namesystem.getNumLiveDataNodes());
+        assertEquals(expectedInServiceNodes, namesystem.getNumDatanodesInService());
+      }
+
+      // restart the nodes to verify that counts are correct after
+      // node re-registration 
+      cluster.restartDataNodes();
+      cluster.waitActive();
+      datanodes = cluster.getDataNodes();
+      expectedInServiceNodes = nodes;
+      assertEquals(nodes, datanodes.size());
+      assertEquals(nodes, namesystem.getNumLiveDataNodes());
+      assertEquals(expectedInServiceNodes, namesystem.getNumDatanodesInService());
+      assertEquals(expectedTotalLoad, namesystem.getTotalLoad());
+      assertEquals((double)expectedInServiceLoad/expectedInServiceLoad,
+          namesystem.getInServiceXceiverAverage(), EPSILON);
+      
+      // create streams and hsync to force datastreamers to start
+      DFSOutputStream[] streams = new DFSOutputStream[fileCount];
+      for (int i=0; i < fileCount; i++) {
+        streams[i] = (DFSOutputStream)fs.create(new Path("/f"+i), fileRepl)
+            .getWrappedStream();
+        streams[i].write("1".getBytes());
+        streams[i].hsync();
+        // the load for writers is 2 because both the write xceiver & packet
+        // responder threads are counted in the load
+        expectedTotalLoad += 2*fileRepl;
+        expectedInServiceLoad += 2*fileRepl;
+      }
+      // force nodes to send load update
+      triggerHeartbeats(datanodes);
+      assertEquals(nodes, namesystem.getNumLiveDataNodes());
+      assertEquals(expectedInServiceNodes,
+          namesystem.getNumDatanodesInService());
+      assertEquals(expectedTotalLoad, namesystem.getTotalLoad());
+      assertEquals((double)expectedInServiceLoad/expectedInServiceNodes,
+          namesystem.getInServiceXceiverAverage(), EPSILON);
+
+      // decomm a few nodes, substract their load from the expected load,
+      // trigger heartbeat to force load update
+      for (int i=0; i < fileRepl; i++) {
+        expectedInServiceNodes--;
+        DatanodeDescriptor dnd =
+            dnm.getDatanode(datanodes.get(i).getDatanodeId());
+        expectedInServiceLoad -= dnd.getXceiverCount();
+        dnm.startDecommission(dnd);
+        DataNodeTestUtils.triggerHeartbeat(datanodes.get(i));
+        Thread.sleep(100);
+        assertEquals(nodes, namesystem.getNumLiveDataNodes());
+        assertEquals(expectedInServiceNodes,
+            namesystem.getNumDatanodesInService());
+        assertEquals(expectedTotalLoad, namesystem.getTotalLoad());
+        assertEquals((double)expectedInServiceLoad/expectedInServiceNodes,
+            namesystem.getInServiceXceiverAverage(), EPSILON);
+      }
+      
+      // check expected load while closing each stream.  recalc expected
+      // load based on whether the nodes in the pipeline are decomm
+      for (int i=0; i < fileCount; i++) {
+        int decomm = 0;
+        for (DatanodeInfo dni : streams[i].getPipeline()) {
+          DatanodeDescriptor dnd = dnm.getDatanode(dni);
+          expectedTotalLoad -= 2;
+          if (dnd.isDecommissionInProgress() || dnd.isDecommissioned()) {
+            decomm++;
+          } else {
+            expectedInServiceLoad -= 2;
+          }
+        }
+        try {
+          streams[i].close();
+        } catch (IOException ioe) {
+          // nodes will go decommissioned even if there's a UC block whose
+          // other locations are decommissioned too.  we'll ignore that
+          // bug for now
+          if (decomm < fileRepl) {
+            throw ioe;
+          }
+        }
+        triggerHeartbeats(datanodes);
+        // verify node count and loads 
+        assertEquals(nodes, namesystem.getNumLiveDataNodes());
+        assertEquals(expectedInServiceNodes,
+            namesystem.getNumDatanodesInService());
+        assertEquals(expectedTotalLoad, namesystem.getTotalLoad());
+        assertEquals((double)expectedInServiceLoad/expectedInServiceNodes,
+            namesystem.getInServiceXceiverAverage(), EPSILON);
+      }
+
+      // shutdown each node, verify node counts based on decomm state
+      for (int i=0; i < nodes; i++) {
+        DataNode dn = datanodes.get(i);
+        dn.shutdown();
+        // force it to appear dead so live count decreases
+        DatanodeDescriptor dnDesc = dnm.getDatanode(dn.getDatanodeId());
+        dnDesc.setLastUpdate(0L);
+        BlockManagerTestUtil.checkHeartbeat(namesystem.getBlockManager());
+        assertEquals(nodes-1-i, namesystem.getNumLiveDataNodes());
+        // first few nodes are already out of service
+        if (i >= fileRepl) {
+          expectedInServiceNodes--;
+        }
+        assertEquals(expectedInServiceNodes, namesystem.getNumDatanodesInService());
+        
+        // live nodes always report load of 1.  no nodes is load 0
+        double expectedXceiverAvg = (i == nodes-1) ? 0.0 : 1.0;
+        assertEquals((double)expectedXceiverAvg,
+            namesystem.getInServiceXceiverAverage(), EPSILON);
+      }
+      
+      // final sanity check
+      assertEquals(0, namesystem.getNumLiveDataNodes());
+      assertEquals(0, namesystem.getNumDatanodesInService());
+      assertEquals(0.0, namesystem.getTotalLoad(), EPSILON);
+      assertEquals(0.0, namesystem.getInServiceXceiverAverage(), EPSILON);
+    } finally {
+      if (cluster != null) {
+        cluster.shutdown();
+      }
+    }
+  }
+  
+  private void triggerHeartbeats(List<DataNode> datanodes)
+      throws IOException, InterruptedException {
+    for (DataNode dn : datanodes) {
+      DataNodeTestUtils.triggerHeartbeat(dn);
+    }
+    Thread.sleep(100);
+  }
 }
 }