소스 검색

YARN-1101. Active nodes can be decremented below 0 (Robert Parker via tgraves)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1518386 13f79535-47bb-0310-9956-ffa450edef68
Thomas Graves 11 년 전
부모
커밋
669e939edc

+ 6 - 0
hadoop-yarn-project/CHANGES.txt

@@ -102,6 +102,9 @@ Release 2.1.1-beta - UNRELEASED
     YARN-602. Fixed NodeManager to not let users override some mandatory 
     environmental variables. (Kenji Kikushima  via vinodkv)
 
+    YARN-1101. Active nodes can be decremented below 0 (Robert Parker 
+    via tgraves)
+
 Release 2.1.0-beta - 2013-08-22
 
   INCOMPATIBLE CHANGES
@@ -1220,6 +1223,9 @@ Release 0.23.10 - UNRELEASED
 
     YARN-337. RM handles killed application tracking URL poorly (jlowe)
 
+    YARN-1101. Active nodes can be decremented below 0 (Robert Parker 
+    via tgraves)
+
 Release 0.23.9 - 2013-07-08
 
   INCOMPATIBLE CHANGES

+ 16 - 5
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java

@@ -393,9 +393,18 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
     }
   }
 
-  private void updateMetricsForDeactivatedNode(NodeState finalState) {
+  private void updateMetricsForDeactivatedNode(NodeState initialState,
+                                               NodeState finalState) {
     ClusterMetrics metrics = ClusterMetrics.getMetrics();
-    metrics.decrNumActiveNodes();
+
+    switch (initialState) {
+      case RUNNING:
+        metrics.decrNumActiveNodes();
+        break;
+      case UNHEALTHY:
+        metrics.decrNumUnhealthyNMs();
+        break;
+    }
 
     switch (finalState) {
     case DECOMMISSIONED:
@@ -505,7 +514,8 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
       // If the current state is NodeState.UNHEALTHY
       // Then node is already been removed from the
       // Scheduler
-      if (!rmNode.getState().equals(NodeState.UNHEALTHY)) {
+      NodeState initialState = rmNode.getState();
+      if (!initialState.equals(NodeState.UNHEALTHY)) {
         rmNode.context.getDispatcher().getEventHandler()
           .handle(new NodeRemovedSchedulerEvent(rmNode));
       }
@@ -520,7 +530,7 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
       rmNode.context.getInactiveRMNodes().put(rmNode.nodeId.getHost(), rmNode);
 
       //Update the metrics
-      rmNode.updateMetricsForDeactivatedNode(finalState);
+      rmNode.updateMetricsForDeactivatedNode(initialState, finalState);
     }
   }
 
@@ -550,7 +560,8 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
             new NodesListManagerEvent(
                 NodesListManagerEventType.NODE_UNUSABLE, rmNode));
         // Update metrics
-        rmNode.updateMetricsForDeactivatedNode(NodeState.UNHEALTHY);
+        rmNode.updateMetricsForDeactivatedNode(rmNode.getState(),
+            NodeState.UNHEALTHY);
         return NodeState.UNHEALTHY;
       }
 

+ 42 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java

@@ -267,7 +267,21 @@ public class TestRMNodeTransitions {
   @Test
   public void testUnhealthyExpire() {
     RMNodeImpl node = getUnhealthyNode();
+    ClusterMetrics cm = ClusterMetrics.getMetrics();
+    int initialActive = cm.getNumActiveNMs();
+    int initialLost = cm.getNumLostNMs();
+    int initialUnhealthy = cm.getUnhealthyNMs();
+    int initialDecommissioned = cm.getNumDecommisionedNMs();
+    int initialRebooted = cm.getNumRebootedNMs();
     node.handle(new RMNodeEvent(node.getNodeID(), RMNodeEventType.EXPIRE));
+    Assert.assertEquals("Active Nodes", initialActive, cm.getNumActiveNMs());
+    Assert.assertEquals("Lost Nodes", initialLost + 1, cm.getNumLostNMs());
+    Assert.assertEquals("Unhealthy Nodes",
+        initialUnhealthy - 1, cm.getUnhealthyNMs());
+    Assert.assertEquals("Decommissioned Nodes",
+        initialDecommissioned, cm.getNumDecommisionedNMs());
+    Assert.assertEquals("Rebooted Nodes",
+        initialRebooted, cm.getNumRebootedNMs());
     Assert.assertEquals(NodeState.LOST, node.getState());
   }
   
@@ -291,8 +305,22 @@ public class TestRMNodeTransitions {
   @Test
   public void testUnhealthyDecommission() {
     RMNodeImpl node = getUnhealthyNode();
+    ClusterMetrics cm = ClusterMetrics.getMetrics();
+    int initialActive = cm.getNumActiveNMs();
+    int initialLost = cm.getNumLostNMs();
+    int initialUnhealthy = cm.getUnhealthyNMs();
+    int initialDecommissioned = cm.getNumDecommisionedNMs();
+    int initialRebooted = cm.getNumRebootedNMs();
     node.handle(new RMNodeEvent(node.getNodeID(),
         RMNodeEventType.DECOMMISSION));
+    Assert.assertEquals("Active Nodes", initialActive, cm.getNumActiveNMs());
+    Assert.assertEquals("Lost Nodes", initialLost, cm.getNumLostNMs());
+    Assert.assertEquals("Unhealthy Nodes",
+        initialUnhealthy - 1, cm.getUnhealthyNMs());
+    Assert.assertEquals("Decommissioned Nodes",
+        initialDecommissioned + 1, cm.getNumDecommisionedNMs());
+    Assert.assertEquals("Rebooted Nodes",
+        initialRebooted, cm.getNumRebootedNMs());
     Assert.assertEquals(NodeState.DECOMMISSIONED, node.getState());
   }
 
@@ -307,8 +335,22 @@ public class TestRMNodeTransitions {
   @Test
   public void testUnhealthyRebooting() {
     RMNodeImpl node = getUnhealthyNode();
+    ClusterMetrics cm = ClusterMetrics.getMetrics();
+    int initialActive = cm.getNumActiveNMs();
+    int initialLost = cm.getNumLostNMs();
+    int initialUnhealthy = cm.getUnhealthyNMs();
+    int initialDecommissioned = cm.getNumDecommisionedNMs();
+    int initialRebooted = cm.getNumRebootedNMs();
     node.handle(new RMNodeEvent(node.getNodeID(),
         RMNodeEventType.REBOOTING));
+    Assert.assertEquals("Active Nodes", initialActive, cm.getNumActiveNMs());
+    Assert.assertEquals("Lost Nodes", initialLost, cm.getNumLostNMs());
+    Assert.assertEquals("Unhealthy Nodes",
+        initialUnhealthy - 1, cm.getUnhealthyNMs());
+    Assert.assertEquals("Decommissioned Nodes",
+        initialDecommissioned, cm.getNumDecommisionedNMs());
+    Assert.assertEquals("Rebooted Nodes",
+        initialRebooted + 1, cm.getNumRebootedNMs());
     Assert.assertEquals(NodeState.REBOOTED, node.getState());
   }