|
@@ -315,6 +315,106 @@ public class TestNameNodeMXBean {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ @Test (timeout = 120000)
|
|
|
+ public void testDecommissioningNodes() throws Exception {
|
|
|
+ Configuration conf = new Configuration();
|
|
|
+ conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1);
|
|
|
+ conf.setInt(DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, 30);
|
|
|
+ MiniDFSCluster cluster = null;
|
|
|
+ HostsFileWriter hostsFileWriter = new HostsFileWriter();
|
|
|
+ hostsFileWriter.initialize(conf, "temp/TestNameNodeMXBean");
|
|
|
+
|
|
|
+ try {
|
|
|
+ cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
|
|
|
+ cluster.waitActive();
|
|
|
+
|
|
|
+ FSNamesystem fsn = cluster.getNameNode().namesystem;
|
|
|
+ MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
|
|
|
+ ObjectName mxbeanName = new ObjectName(
|
|
|
+ "Hadoop:service=NameNode,name=NameNodeInfo");
|
|
|
+
|
|
|
+ List<String> hosts = new ArrayList<>();
|
|
|
+ for(DataNode dn : cluster.getDataNodes()) {
|
|
|
+ hosts.add(dn.getDisplayName());
|
|
|
+ }
|
|
|
+ hostsFileWriter.initIncludeHosts(hosts.toArray(
|
|
|
+ new String[hosts.size()]));
|
|
|
+ fsn.getBlockManager().getDatanodeManager().refreshNodes(conf);
|
|
|
+
|
|
|
+ // 1. Verify Live nodes
|
|
|
+ String liveNodesInfo = (String) (mbs.getAttribute(mxbeanName,
|
|
|
+ "LiveNodes"));
|
|
|
+ Map<String, Map<String, Object>> liveNodes =
|
|
|
+ (Map<String, Map<String, Object>>) JSON.parse(liveNodesInfo);
|
|
|
+ assertEquals(fsn.getLiveNodes(), liveNodesInfo);
|
|
|
+ assertEquals(fsn.getNumLiveDataNodes(), liveNodes.size());
|
|
|
+
|
|
|
+ for (Map<String, Object> liveNode : liveNodes.values()) {
|
|
|
+ assertTrue(liveNode.containsKey("lastContact"));
|
|
|
+ assertTrue(liveNode.containsKey("xferaddr"));
|
|
|
+ }
|
|
|
+
|
|
|
+ // Add the 1st DataNode to Decommission list
|
|
|
+ hostsFileWriter.initExcludeHost(
|
|
|
+ cluster.getDataNodes().get(0).getDisplayName());
|
|
|
+ fsn.getBlockManager().getDatanodeManager().refreshNodes(conf);
|
|
|
+
|
|
|
+ // Wait for the DecommissionManager to complete refresh nodes
|
|
|
+ GenericTestUtils.waitFor(new Supplier<Boolean>() {
|
|
|
+ @Override
|
|
|
+ public Boolean get() {
|
|
|
+ try {
|
|
|
+ String decomNodesInfo = (String) (mbs.getAttribute(mxbeanName,
|
|
|
+ "DecomNodes"));
|
|
|
+ Map<String, Map<String, Object>> decomNodes =
|
|
|
+ (Map<String, Map<String, Object>>) JSON.parse(decomNodesInfo);
|
|
|
+ if (decomNodes.size() > 0) {
|
|
|
+ return true;
|
|
|
+ }
|
|
|
+ } catch (Exception e) {
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+ }, 1000, 60000);
|
|
|
+
|
|
|
+ // 2. Verify Decommission InProgress nodes
|
|
|
+ String decomNodesInfo = (String) (mbs.getAttribute(mxbeanName,
|
|
|
+ "DecomNodes"));
|
|
|
+ Map<String, Map<String, Object>> decomNodes =
|
|
|
+ (Map<String, Map<String, Object>>) JSON.parse(decomNodesInfo);
|
|
|
+ assertEquals(fsn.getDecomNodes(), decomNodesInfo);
|
|
|
+ assertEquals(fsn.getNumDecommissioningDataNodes(), decomNodes.size());
|
|
|
+ assertEquals(0, fsn.getNumDecomLiveDataNodes());
|
|
|
+ assertEquals(0, fsn.getNumDecomDeadDataNodes());
|
|
|
+
|
|
|
+ // Wait for the DecommissionManager to complete check
|
|
|
+ GenericTestUtils.waitFor(new Supplier<Boolean>() {
|
|
|
+ @Override
|
|
|
+ public Boolean get() {
|
|
|
+ if (fsn.getNumDecomLiveDataNodes() == 1) {
|
|
|
+ return true;
|
|
|
+ }
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+ }, 1000, 60000);
|
|
|
+
|
|
|
+ // 3. Verify Decommissioned nodes
|
|
|
+ decomNodesInfo = (String) (mbs.getAttribute(mxbeanName, "DecomNodes"));
|
|
|
+ decomNodes =
|
|
|
+ (Map<String, Map<String, Object>>) JSON.parse(decomNodesInfo);
|
|
|
+ assertEquals(0, decomNodes.size());
|
|
|
+ assertEquals(fsn.getDecomNodes(), decomNodesInfo);
|
|
|
+ assertEquals(1, fsn.getNumDecomLiveDataNodes());
|
|
|
+ assertEquals(0, fsn.getNumDecomDeadDataNodes());
|
|
|
+ } finally {
|
|
|
+ if (cluster != null) {
|
|
|
+ cluster.shutdown();
|
|
|
+ }
|
|
|
+ hostsFileWriter.cleanup();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
@Test(timeout=120000)
|
|
|
@SuppressWarnings("unchecked")
|
|
|
public void testTopUsers() throws Exception {
|