Browse Source

HDFS-7375. Move FSClusterStats to o.a.h.h.hdfs.server.blockmanagement. Contributed by Haohui Mai.

Haohui Mai 10 năm trước cách đây
mục cha
commit
3cb426be45
13 tập tin đã thay đổi với 182 bổ sung145 xóa
  1. 3 0
      hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
  2. 7 7
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
  3. 0 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicy.java
  4. 0 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java
  5. 0 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyWithNodeGroup.java
  6. 39 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
  7. 9 11
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/FSClusterStats.java
  8. 3 25
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
  9. 4 6
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java
  10. 2 3
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java
  11. 4 2
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyConsiderLoad.java
  12. 75 51
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeMXBean.java
  13. 36 36
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNamenodeCapacityReport.java

+ 3 - 0
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt

@@ -98,6 +98,9 @@ Release 2.7.0 - UNRELEASED
     HDFS-7381. Decouple the management of block id and gen stamps from
     FSNamesystem. (wheat9)
 
+    HDFS-7375. Move FSClusterStats to o.a.h.h.hdfs.server.blockmanagement.
+    (wheat9)
+
   OPTIMIZATIONS
 
   BUG FIXES

+ 7 - 7
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java

@@ -66,7 +66,6 @@ import org.apache.hadoop.hdfs.server.blockmanagement.CorruptReplicasMap.Reason;
 import org.apache.hadoop.hdfs.server.blockmanagement.PendingDataNodeMessages.ReportedBlockInfo;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
-import org.apache.hadoop.hdfs.server.namenode.FSClusterStats;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.hdfs.server.namenode.NameNode.OperationCategory;
 import org.apache.hadoop.hdfs.server.namenode.Namesystem;
@@ -111,7 +110,7 @@ public class BlockManager {
   private final DatanodeManager datanodeManager;
   private final HeartbeatManager heartbeatManager;
   private final BlockTokenSecretManager blockTokenSecretManager;
-  
+
   private final PendingDataNodeMessages pendingDNMessages =
     new PendingDataNodeMessages();
 
@@ -264,9 +263,9 @@ public class BlockManager {
 
   /** Check whether name system is running before terminating */
   private boolean checkNSRunning = true;
-  
-  public BlockManager(final Namesystem namesystem, final FSClusterStats stats,
-      final Configuration conf) throws IOException {
+
+  public BlockManager(final Namesystem namesystem, final Configuration conf)
+    throws IOException {
     this.namesystem = namesystem;
     datanodeManager = new DatanodeManager(this, namesystem, conf);
     heartbeatManager = datanodeManager.getHeartbeatManager();
@@ -281,8 +280,9 @@ public class BlockManager {
     blocksMap = new BlocksMap(
         LightWeightGSet.computeCapacity(2.0, "BlocksMap"));
     blockplacement = BlockPlacementPolicy.getInstance(
-        conf, stats, datanodeManager.getNetworkTopology(), 
-        datanodeManager.getHost2DatanodeMap());
+      conf, datanodeManager.getFSClusterStats(),
+      datanodeManager.getNetworkTopology(),
+      datanodeManager.getHost2DatanodeMap());
     storagePolicySuite = BlockStoragePolicySuite.createDefaultSuite();
     pendingReplications = new PendingReplicationBlocks(conf.getInt(
       DFSConfigKeys.DFS_NAMENODE_REPLICATION_PENDING_TIMEOUT_SEC_KEY,

+ 0 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicy.java

@@ -33,7 +33,6 @@ import org.apache.hadoop.hdfs.StorageType;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
-import org.apache.hadoop.hdfs.server.namenode.FSClusterStats;
 import org.apache.hadoop.net.NetworkTopology;
 import org.apache.hadoop.net.Node;
 import org.apache.hadoop.util.ReflectionUtils;

+ 0 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java

@@ -31,7 +31,6 @@ import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
-import org.apache.hadoop.hdfs.server.namenode.FSClusterStats;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage.State;
 import org.apache.hadoop.net.NetworkTopology;
 import org.apache.hadoop.net.Node;

+ 0 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyWithNodeGroup.java

@@ -23,7 +23,6 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.StorageType;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
-import org.apache.hadoop.hdfs.server.namenode.FSClusterStats;
 import org.apache.hadoop.net.NetworkTopology;
 import org.apache.hadoop.net.NetworkTopologyWithNodeGroup;
 import org.apache.hadoop.net.Node;

+ 39 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java

@@ -66,6 +66,7 @@ public class DatanodeManager {
   private final Namesystem namesystem;
   private final BlockManager blockManager;
   private final HeartbeatManager heartbeatManager;
+  private final FSClusterStats fsClusterStats;
   private Daemon decommissionthread = null;
 
   /**
@@ -169,13 +170,14 @@ public class DatanodeManager {
    * directives that we've already sent.
    */
   private final long timeBetweenResendingCachingDirectivesMs;
-  
+
   DatanodeManager(final BlockManager blockManager, final Namesystem namesystem,
       final Configuration conf) throws IOException {
     this.namesystem = namesystem;
     this.blockManager = blockManager;
     
     this.heartbeatManager = new HeartbeatManager(namesystem, blockManager, conf);
+    this.fsClusterStats = newFSClusterStats();
 
     networktopology = NetworkTopology.getInstance(conf);
 
@@ -329,6 +331,11 @@ public class DatanodeManager {
     return heartbeatManager;
   }
 
+  @VisibleForTesting
+  public FSClusterStats getFSClusterStats() {
+    return fsClusterStats;
+  }
+
   /** @return the datanode statistics. */
   public DatanodeStatistics getDatanodeStatistics() {
     return heartbeatManager;
@@ -1594,5 +1601,36 @@ public class DatanodeManager {
   public void setShouldSendCachingCommands(boolean shouldSendCachingCommands) {
     this.shouldSendCachingCommands = shouldSendCachingCommands;
   }
+
+  FSClusterStats newFSClusterStats() {
+    return new FSClusterStats() {
+      @Override
+      public int getTotalLoad() {
+        return heartbeatManager.getXceiverCount();
+      }
+
+      @Override
+      public boolean isAvoidingStaleDataNodesForWrite() {
+        return shouldAvoidStaleDataNodesForWrite();
+      }
+
+      @Override
+      public int getNumDatanodesInService() {
+        return heartbeatManager.getNumDatanodesInService();
+      }
+
+      @Override
+      public double getInServiceXceiverAverage() {
+        double avgLoad = 0;
+        final int nodes = getNumDatanodesInService();
+        if (nodes != 0) {
+          final int xceivers = heartbeatManager
+            .getInServiceXceiverCount();
+          avgLoad = (double)xceivers/nodes;
+        }
+        return avgLoad;
+      }
+    };
+  }
 }
 

+ 9 - 11
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSClusterStats.java → hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/FSClusterStats.java

@@ -15,12 +15,12 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.hadoop.hdfs.server.namenode;
+package org.apache.hadoop.hdfs.server.blockmanagement;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 
-/** 
- * This interface is used for retrieving the load related statistics of 
+/**
+ * This interface is used for retrieving the load related statistics of
  * the cluster.
  */
 @InterfaceAudience.Private
@@ -28,17 +28,17 @@ public interface FSClusterStats {
 
   /**
    * an indication of the total load of the cluster.
-   * 
+   *
    * @return a count of the total number of block transfers and block
    *         writes that are currently occuring on the cluster.
    */
   public int getTotalLoad();
-  
+
   /**
-   * Indicate whether or not the cluster is now avoiding 
+   * Indicate whether or not the cluster is now avoiding
    * to use stale DataNodes for writing.
-   * 
-   * @return True if the cluster is currently avoiding using stale DataNodes 
+   *
+   * @return True if the cluster is currently avoiding using stale DataNodes
    *         for writing targets, and false otherwise.
    */
   public boolean isAvoidingStaleDataNodesForWrite();
@@ -52,11 +52,9 @@ public interface FSClusterStats {
   /**
    * an indication of the average load of non-decommission(ing|ed) nodes
    * eligible for block placement
-   * 
+   *
    * @return average of the in service number of block transfers and block
    *         writes that are currently occurring on the cluster.
    */
   public double getInServiceXceiverAverage();
 }
-    
-    

+ 3 - 25
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java

@@ -309,8 +309,8 @@ import com.google.common.collect.Lists;
  ***************************************************/
 @InterfaceAudience.Private
 @Metrics(context="dfs")
-public class FSNamesystem implements Namesystem, FSClusterStats,
-    FSNamesystemMBean, NameNodeMXBean {
+public class FSNamesystem implements Namesystem, FSNamesystemMBean,
+  NameNodeMXBean {
   public static final Log LOG = LogFactory.getLog(FSNamesystem.class);
 
   private static final ThreadLocal<StringBuilder> auditBuffer =
@@ -761,7 +761,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
           DFS_NAMENODE_RESOURCE_CHECK_INTERVAL_KEY,
           DFS_NAMENODE_RESOURCE_CHECK_INTERVAL_DEFAULT);
 
-      this.blockManager = new BlockManager(this, this, conf);
+      this.blockManager = new BlockManager(this, conf);
       this.datanodeStatistics = blockManager.getDatanodeManager().getDatanodeStatistics();
       this.blockIdManager = new BlockIdManager(blockManager);
 
@@ -7825,28 +7825,6 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
     this.nnResourceChecker = nnResourceChecker;
   }
 
-  @Override
-  public boolean isAvoidingStaleDataNodesForWrite() {
-    return this.blockManager.getDatanodeManager()
-        .shouldAvoidStaleDataNodesForWrite();
-  }
-
-  @Override // FSClusterStats
-  public int getNumDatanodesInService() {
-    return datanodeStatistics.getNumDatanodesInService();
-  }
-  
-  @Override // for block placement strategy
-  public double getInServiceXceiverAverage() {
-    double avgLoad = 0;
-    final int nodes = getNumDatanodesInService();
-    if (nodes != 0) {
-      final int xceivers = datanodeStatistics.getInServiceXceiverCount();
-      avgLoad = (double)xceivers/nodes;
-    }
-    return avgLoad;
-  }
-
   public SnapshotManager getSnapshotManager() {
     return snapshotManager;
   }

+ 4 - 6
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java

@@ -81,19 +81,17 @@ public class TestBlockManager {
   private static final int NUM_TEST_ITERS = 30;
   
   private static final int BLOCK_SIZE = 64*1024;
-  
-  private Configuration conf;
+
   private FSNamesystem fsn;
   private BlockManager bm;
 
   @Before
   public void setupMockCluster() throws IOException {
-    conf = new HdfsConfiguration();
-    conf.set(DFSConfigKeys.NET_TOPOLOGY_SCRIPT_FILE_NAME_KEY,
-        "need to set a dummy value here so it assumes a multi-rack cluster");
+    Configuration conf = new HdfsConfiguration();
+    conf.set(DFSConfigKeys.NET_TOPOLOGY_SCRIPT_FILE_NAME_KEY, "need to set a dummy value here so it assumes a multi-rack cluster");
     fsn = Mockito.mock(FSNamesystem.class);
     Mockito.doReturn(true).when(fsn).hasWriteLock();
-    bm = new BlockManager(fsn, fsn, conf);
+    bm = new BlockManager(fsn, conf);
     final String[] racks = {
         "/rackA",
         "/rackA",

+ 2 - 3
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java

@@ -49,12 +49,11 @@ import org.apache.hadoop.hdfs.StorageType;
 import org.apache.hadoop.hdfs.TestBlockStoragePolicy;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
-import org.apache.hadoop.hdfs.server.datanode.DataNode;
-import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager.StatefulBlockInfo;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
-import org.apache.hadoop.hdfs.server.namenode.FSClusterStats;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.hdfs.server.namenode.Namesystem;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;

+ 4 - 2
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyConsiderLoad.java

@@ -130,7 +130,8 @@ public class TestReplicationPolicyConsiderLoad {
       final int load = 2 + 4 + 4;
       
       FSNamesystem fsn = namenode.getNamesystem();
-      assertEquals((double)load/6, fsn.getInServiceXceiverAverage(), EPSILON);
+      assertEquals((double)load/6, dnManager.getFSClusterStats()
+        .getInServiceXceiverAverage(), EPSILON);
       
       // Decommission DNs so BlockPlacementPolicyDefault.isGoodTarget()
       // returns false
@@ -139,7 +140,8 @@ public class TestReplicationPolicyConsiderLoad {
         dnManager.startDecommission(d);
         d.setDecommissioned();
       }
-      assertEquals((double)load/3, fsn.getInServiceXceiverAverage(), EPSILON);
+      assertEquals((double)load/3, dnManager.getFSClusterStats()
+        .getInServiceXceiverAverage(), EPSILON);
 
       // update references of writer DN to update the de-commissioned state
       List<DatanodeDescriptor> liveNodes = new ArrayList<DatanodeDescriptor>();

+ 75 - 51
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeMXBean.java

@@ -17,18 +17,7 @@
  */
 package org.apache.hadoop.hdfs.server.namenode;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-import java.io.File;
-import java.lang.management.ManagementFactory;
-import java.net.URI;
-import java.util.Collection;
-import java.util.Map;
-
-import javax.management.MBeanServer;
-import javax.management.ObjectName;
-
+import com.google.common.util.concurrent.Uninterruptibles;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileUtil;
@@ -43,6 +32,18 @@ import org.apache.hadoop.util.VersionInfo;
 import org.junit.Test;
 import org.mortbay.util.ajax.JSON;
 
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+import java.io.File;
+import java.lang.management.ManagementFactory;
+import java.net.URI;
+import java.util.Collection;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
 /**
  * Class for testing {@link NameNodeMXBean} implementation
  */
@@ -62,14 +63,11 @@ public class TestNameNodeMXBean {
   public void testNameNodeMXBeanInfo() throws Exception {
     Configuration conf = new Configuration();
     conf.setLong(DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY,
-        NativeIO.POSIX.getCacheManipulator().getMemlockLimit());
-    conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1);
-    conf.setInt(DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, 1);
-
+      NativeIO.POSIX.getCacheManipulator().getMemlockLimit());
     MiniDFSCluster cluster = null;
 
     try {
-      cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
+      cluster = new MiniDFSCluster.Builder(conf).build();
       cluster.waitActive();
 
       FSNamesystem fsn = cluster.getNameNode().namesystem;
@@ -77,29 +75,6 @@ public class TestNameNodeMXBean {
       MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
       ObjectName mxbeanName = new ObjectName(
           "Hadoop:service=NameNode,name=NameNodeInfo");
-
-      // Define include file to generate deadNodes metrics
-      FileSystem localFileSys = FileSystem.getLocal(conf);
-      Path workingDir = localFileSys.getWorkingDirectory();
-      Path dir = new Path(workingDir,
-          "build/test/data/temp/TestNameNodeMXBean");
-      Path includeFile = new Path(dir, "include");
-      assertTrue(localFileSys.mkdirs(dir));
-      StringBuilder includeHosts = new StringBuilder();
-      for(DataNode dn : cluster.getDataNodes()) {
-        includeHosts.append(dn.getDisplayName()).append("\n");
-      }
-      DFSTestUtil.writeFile(localFileSys, includeFile, includeHosts.toString());
-      conf.set(DFSConfigKeys.DFS_HOSTS, includeFile.toUri().getPath());
-      fsn.getBlockManager().getDatanodeManager().refreshNodes(conf);
-
-      cluster.stopDataNode(0);
-      while (fsn.getNumDatanodesInService() != 2) {
-        try {
-          Thread.sleep(1000);
-        } catch (InterruptedException e) {}
-      }
-
       // get attribute "ClusterId"
       String clusterId = (String) mbs.getAttribute(mxbeanName, "ClusterId");
       assertEquals(fsn.getClusterId(), clusterId);
@@ -127,8 +102,7 @@ public class TestNameNodeMXBean {
       // get attribute percentremaining
       Float percentremaining = (Float) (mbs.getAttribute(mxbeanName,
           "PercentRemaining"));
-      assertEquals(fsn.getPercentRemaining(), percentremaining
-          .floatValue(), DELTA);
+      assertEquals(fsn.getPercentRemaining(), percentremaining, DELTA);
       // get attribute Totalblocks
       Long totalblocks = (Long) (mbs.getAttribute(mxbeanName, "TotalBlocks"));
       assertEquals(fsn.getTotalBlocks(), totalblocks.longValue());
@@ -151,15 +125,6 @@ public class TestNameNodeMXBean {
       String deadnodeinfo = (String) (mbs.getAttribute(mxbeanName,
           "DeadNodes"));
       assertEquals(fsn.getDeadNodes(), deadnodeinfo);
-      Map<String, Map<String, Object>> deadNodes =
-          (Map<String, Map<String, Object>>) JSON.parse(deadnodeinfo);
-      assertTrue(deadNodes.size() > 0);
-      for (Map<String, Object> deadNode : deadNodes.values()) {
-        assertTrue(deadNode.containsKey("lastContact"));
-        assertTrue(deadNode.containsKey("decommissioned"));
-        assertTrue(deadNode.containsKey("xferaddr"));
-      }
-
       // get attribute NodeUsage
       String nodeUsage = (String) (mbs.getAttribute(mxbeanName,
           "NodeUsage"));
@@ -233,4 +198,63 @@ public class TestNameNodeMXBean {
       }
     }
   }
+
+  @SuppressWarnings({ "unchecked" })
+  @Test
+  public void testLastContactTime() throws Exception {
+    Configuration conf = new Configuration();
+    conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1);
+    conf.setInt(DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, 1);
+    MiniDFSCluster cluster = null;
+
+    try {
+      cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
+      cluster.waitActive();
+
+      FSNamesystem fsn = cluster.getNameNode().namesystem;
+
+      MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
+      ObjectName mxbeanName = new ObjectName(
+        "Hadoop:service=NameNode,name=NameNodeInfo");
+
+      // Define include file to generate deadNodes metrics
+      FileSystem localFileSys = FileSystem.getLocal(conf);
+      Path workingDir = localFileSys.getWorkingDirectory();
+      Path dir = new Path(workingDir,
+        "build/test/data/temp/TestNameNodeMXBean");
+      Path includeFile = new Path(dir, "include");
+      assertTrue(localFileSys.mkdirs(dir));
+      StringBuilder includeHosts = new StringBuilder();
+      for(DataNode dn : cluster.getDataNodes()) {
+        includeHosts.append(dn.getDisplayName()).append("\n");
+      }
+      DFSTestUtil.writeFile(localFileSys, includeFile, includeHosts.toString());
+      conf.set(DFSConfigKeys.DFS_HOSTS, includeFile.toUri().getPath());
+      fsn.getBlockManager().getDatanodeManager().refreshNodes(conf);
+
+      cluster.stopDataNode(0);
+      while (fsn.getBlockManager().getDatanodeManager().getNumLiveDataNodes()
+        != 2 ) {
+        Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
+      }
+
+      // get attribute deadnodeinfo
+      String deadnodeinfo = (String) (mbs.getAttribute(mxbeanName,
+        "DeadNodes"));
+      assertEquals(fsn.getDeadNodes(), deadnodeinfo);
+      Map<String, Map<String, Object>> deadNodes =
+        (Map<String, Map<String, Object>>) JSON.parse(deadnodeinfo);
+      assertTrue(deadNodes.size() > 0);
+      for (Map<String, Object> deadNode : deadNodes.values()) {
+        assertTrue(deadNode.containsKey("lastContact"));
+        assertTrue(deadNode.containsKey("decommissioned"));
+        assertTrue(deadNode.containsKey("xferaddr"));
+      }
+
+    } finally {
+      if (cluster != null) {
+        cluster.shutdown();
+      }
+    }
+  }
 }

+ 36 - 36
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNamenodeCapacityReport.java

@@ -193,11 +193,7 @@ public class TestNamenodeCapacityReport {
       int expectedTotalLoad = nodes;  // xceiver server adds 1 to load
       int expectedInServiceNodes = nodes;
       int expectedInServiceLoad = nodes;
-      assertEquals(nodes, namesystem.getNumLiveDataNodes());
-      assertEquals(expectedInServiceNodes, namesystem.getNumDatanodesInService());
-      assertEquals(expectedTotalLoad, namesystem.getTotalLoad());
-      assertEquals((double)expectedInServiceLoad/expectedInServiceLoad,
-          namesystem.getInServiceXceiverAverage(), EPSILON);
+      checkClusterHealth(nodes, namesystem, expectedTotalLoad, expectedInServiceNodes, expectedInServiceLoad);
       
       // shutdown half the nodes and force a heartbeat check to ensure
       // counts are accurate
@@ -209,7 +205,7 @@ public class TestNamenodeCapacityReport {
         BlockManagerTestUtil.checkHeartbeat(namesystem.getBlockManager());
         expectedInServiceNodes--;
         assertEquals(expectedInServiceNodes, namesystem.getNumLiveDataNodes());
-        assertEquals(expectedInServiceNodes, namesystem.getNumDatanodesInService());
+        assertEquals(expectedInServiceNodes, getNumDNInService(namesystem));
       }
 
       // restart the nodes to verify that counts are correct after
@@ -219,11 +215,7 @@ public class TestNamenodeCapacityReport {
       datanodes = cluster.getDataNodes();
       expectedInServiceNodes = nodes;
       assertEquals(nodes, datanodes.size());
-      assertEquals(nodes, namesystem.getNumLiveDataNodes());
-      assertEquals(expectedInServiceNodes, namesystem.getNumDatanodesInService());
-      assertEquals(expectedTotalLoad, namesystem.getTotalLoad());
-      assertEquals((double)expectedInServiceLoad/expectedInServiceLoad,
-          namesystem.getInServiceXceiverAverage(), EPSILON);
+      checkClusterHealth(nodes, namesystem, expectedTotalLoad, expectedInServiceNodes, expectedInServiceLoad);
       
       // create streams and hsync to force datastreamers to start
       DFSOutputStream[] streams = new DFSOutputStream[fileCount];
@@ -239,12 +231,7 @@ public class TestNamenodeCapacityReport {
       }
       // force nodes to send load update
       triggerHeartbeats(datanodes);
-      assertEquals(nodes, namesystem.getNumLiveDataNodes());
-      assertEquals(expectedInServiceNodes,
-          namesystem.getNumDatanodesInService());
-      assertEquals(expectedTotalLoad, namesystem.getTotalLoad());
-      assertEquals((double)expectedInServiceLoad/expectedInServiceNodes,
-          namesystem.getInServiceXceiverAverage(), EPSILON);
+      checkClusterHealth(nodes, namesystem, expectedTotalLoad, expectedInServiceNodes, expectedInServiceLoad);
 
       // decomm a few nodes, substract their load from the expected load,
       // trigger heartbeat to force load update
@@ -256,12 +243,7 @@ public class TestNamenodeCapacityReport {
         dnm.startDecommission(dnd);
         DataNodeTestUtils.triggerHeartbeat(datanodes.get(i));
         Thread.sleep(100);
-        assertEquals(nodes, namesystem.getNumLiveDataNodes());
-        assertEquals(expectedInServiceNodes,
-            namesystem.getNumDatanodesInService());
-        assertEquals(expectedTotalLoad, namesystem.getTotalLoad());
-        assertEquals((double)expectedInServiceLoad/expectedInServiceNodes,
-            namesystem.getInServiceXceiverAverage(), EPSILON);
+        checkClusterHealth(nodes, namesystem, expectedTotalLoad, expectedInServiceNodes, expectedInServiceLoad);
       }
       
       // check expected load while closing each stream.  recalc expected
@@ -289,12 +271,7 @@ public class TestNamenodeCapacityReport {
         }
         triggerHeartbeats(datanodes);
         // verify node count and loads 
-        assertEquals(nodes, namesystem.getNumLiveDataNodes());
-        assertEquals(expectedInServiceNodes,
-            namesystem.getNumDatanodesInService());
-        assertEquals(expectedTotalLoad, namesystem.getTotalLoad());
-        assertEquals((double)expectedInServiceLoad/expectedInServiceNodes,
-            namesystem.getInServiceXceiverAverage(), EPSILON);
+        checkClusterHealth(nodes, namesystem, expectedTotalLoad, expectedInServiceNodes, expectedInServiceLoad);
       }
 
       // shutdown each node, verify node counts based on decomm state
@@ -310,26 +287,49 @@ public class TestNamenodeCapacityReport {
         if (i >= fileRepl) {
           expectedInServiceNodes--;
         }
-        assertEquals(expectedInServiceNodes, namesystem.getNumDatanodesInService());
+        assertEquals(expectedInServiceNodes, getNumDNInService(namesystem));
         
         // live nodes always report load of 1.  no nodes is load 0
         double expectedXceiverAvg = (i == nodes-1) ? 0.0 : 1.0;
         assertEquals((double)expectedXceiverAvg,
-            namesystem.getInServiceXceiverAverage(), EPSILON);
+            getInServiceXceiverAverage(namesystem), EPSILON);
       }
       
       // final sanity check
-      assertEquals(0, namesystem.getNumLiveDataNodes());
-      assertEquals(0, namesystem.getNumDatanodesInService());
-      assertEquals(0.0, namesystem.getTotalLoad(), EPSILON);
-      assertEquals(0.0, namesystem.getInServiceXceiverAverage(), EPSILON);
+      checkClusterHealth(0, namesystem, 0.0, 0, 0.0);
     } finally {
       if (cluster != null) {
         cluster.shutdown();
       }
     }
   }
-  
+
+  private static void checkClusterHealth(
+    int numOfLiveNodes,
+    FSNamesystem namesystem, double expectedTotalLoad,
+    int expectedInServiceNodes, double expectedInServiceLoad) {
+
+    assertEquals(numOfLiveNodes, namesystem.getNumLiveDataNodes());
+    assertEquals(expectedInServiceNodes, getNumDNInService(namesystem));
+    assertEquals(expectedTotalLoad, namesystem.getTotalLoad(), EPSILON);
+    if (expectedInServiceNodes != 0) {
+      assertEquals(expectedInServiceLoad / expectedInServiceNodes,
+        getInServiceXceiverAverage(namesystem), EPSILON);
+    } else {
+      assertEquals(0.0, getInServiceXceiverAverage(namesystem), EPSILON);
+    }
+  }
+
+  private static int getNumDNInService(FSNamesystem fsn) {
+    return fsn.getBlockManager().getDatanodeManager().getFSClusterStats()
+      .getNumDatanodesInService();
+  }
+
+  private static double getInServiceXceiverAverage(FSNamesystem fsn) {
+    return fsn.getBlockManager().getDatanodeManager().getFSClusterStats()
+      .getInServiceXceiverAverage();
+  }
+
   private void triggerHeartbeats(List<DataNode> datanodes)
       throws IOException, InterruptedException {
     for (DataNode dn : datanodes) {