Explorar el Código

YARN-3266. RMContext#inactiveNodes should have NodeId as map key. Contributed by Chengbing Liu

Jian He hace 10 años
padre
commit
b46ee1e7a3
Se han modificado 9 ficheros con 72 adiciones y 21 borrados
  1. 3 0
      hadoop-yarn-project/CHANGES.txt
  2. 3 3
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java
  3. 1 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java
  4. 1 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java
  5. 5 5
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
  6. 1 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java
  7. 49 4
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java
  8. 3 3
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebApp.java
  9. 6 3
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesNodes.java

+ 3 - 0
hadoop-yarn-project/CHANGES.txt

@@ -192,6 +192,9 @@ Release 2.8.0 - UNRELEASED
     YARN-3472. Fixed possible leak in DelegationTokenRenewer#allTokens.
     (Rohith Sharmaks via jianhe)
 
+    YARN-3266. RMContext#inactiveNodes should have NodeId as map key.
+    (Chengbing Liu via jianhe)
+
 Release 2.7.1 - UNRELEASED
 
   INCOMPATIBLE CHANGES

+ 3 - 3
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java

@@ -68,8 +68,8 @@ public class RMActiveServiceContext {
   private final ConcurrentMap<NodeId, RMNode> nodes =
       new ConcurrentHashMap<NodeId, RMNode>();
 
-  private final ConcurrentMap<String, RMNode> inactiveNodes =
-      new ConcurrentHashMap<String, RMNode>();
+  private final ConcurrentMap<NodeId, RMNode> inactiveNodes =
+      new ConcurrentHashMap<NodeId, RMNode>();
 
   private final ConcurrentMap<ApplicationId, ByteBuffer> systemCredentials =
       new ConcurrentHashMap<ApplicationId, ByteBuffer>();
@@ -185,7 +185,7 @@ public class RMActiveServiceContext {
 
   @Private
   @Unstable
-  public ConcurrentMap<String, RMNode> getInactiveRMNodes() {
+  public ConcurrentMap<NodeId, RMNode> getInactiveRMNodes() {
     return this.inactiveNodes;
   }
 

+ 1 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java

@@ -61,7 +61,7 @@ public interface RMContext {
   
   ConcurrentMap<ApplicationId, ByteBuffer> getSystemCredentialsForApps();
 
-  ConcurrentMap<String, RMNode> getInactiveRMNodes();
+  ConcurrentMap<NodeId, RMNode> getInactiveRMNodes();
 
   ConcurrentMap<NodeId, RMNode> getRMNodes();
 

+ 1 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java

@@ -149,7 +149,7 @@ public class RMContextImpl implements RMContext {
   }
 
   @Override
-  public ConcurrentMap<String, RMNode> getInactiveRMNodes() {
+  public ConcurrentMap<NodeId, RMNode> getInactiveRMNodes() {
     return activeServiceContext.getInactiveRMNodes();
   }
 

+ 5 - 5
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java

@@ -524,11 +524,11 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
       RMNodeStartedEvent startEvent = (RMNodeStartedEvent) event;
       List<NMContainerStatus> containers = null;
 
-      String host = rmNode.nodeId.getHost();
-      if (rmNode.context.getInactiveRMNodes().containsKey(host)) {
+      NodeId nodeId = rmNode.nodeId;
+      if (rmNode.context.getInactiveRMNodes().containsKey(nodeId)) {
         // Old node rejoining
-        RMNode previouRMNode = rmNode.context.getInactiveRMNodes().get(host);
-        rmNode.context.getInactiveRMNodes().remove(host);
+        RMNode previouRMNode = rmNode.context.getInactiveRMNodes().get(nodeId);
+        rmNode.context.getInactiveRMNodes().remove(nodeId);
         rmNode.updateMetricsForRejoinedNode(previouRMNode.getState());
       } else {
         // Increment activeNodes explicitly because this is a new node.
@@ -737,7 +737,7 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
       rmNode.context.getRMNodes().remove(rmNode.nodeId);
       LOG.info("Deactivating Node " + rmNode.nodeId + " as it is now "
           + finalState);
-      rmNode.context.getInactiveRMNodes().put(rmNode.nodeId.getHost(), rmNode);
+      rmNode.context.getInactiveRMNodes().put(rmNode.nodeId, rmNode);
 
       //Update the metrics
       rmNode.updateMetricsForDeactivatedNode(initialState, finalState);

+ 1 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java

@@ -320,7 +320,7 @@ public class RMWebServices {
     RMNode ni = this.rm.getRMContext().getRMNodes().get(nid);
     boolean isInactive = false;
     if (ni == null) {
-      ni = this.rm.getRMContext().getInactiveRMNodes().get(nid.getHost());
+      ni = this.rm.getRMContext().getInactiveRMNodes().get(nid);
       if (ni == null) {
         throw new NotFoundException("nodeId, " + nodeId + ", is not found");
       }

+ 49 - 4
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java

@@ -306,6 +306,47 @@ public class TestRMNodeTransitions {
     Assert.assertEquals(NodeState.LOST, node.getState());
   }
 
+  @Test
+  public void testRunningExpireMultiple() {
+    RMNodeImpl node1 = getRunningNode(null, 10001);
+    RMNodeImpl node2 = getRunningNode(null, 10002);
+    ClusterMetrics cm = ClusterMetrics.getMetrics();
+    int initialActive = cm.getNumActiveNMs();
+    int initialLost = cm.getNumLostNMs();
+    int initialUnhealthy = cm.getUnhealthyNMs();
+    int initialDecommissioned = cm.getNumDecommisionedNMs();
+    int initialRebooted = cm.getNumRebootedNMs();
+    node1.handle(new RMNodeEvent(node1.getNodeID(), RMNodeEventType.EXPIRE));
+    Assert.assertEquals("Active Nodes", initialActive - 1, cm.getNumActiveNMs());
+    Assert.assertEquals("Lost Nodes", initialLost + 1, cm.getNumLostNMs());
+    Assert.assertEquals("Unhealthy Nodes", initialUnhealthy,
+        cm.getUnhealthyNMs());
+    Assert.assertEquals("Decommissioned Nodes", initialDecommissioned,
+        cm.getNumDecommisionedNMs());
+    Assert.assertEquals("Rebooted Nodes", initialRebooted,
+        cm.getNumRebootedNMs());
+    Assert.assertEquals(NodeState.LOST, node1.getState());
+    Assert.assertTrue("Node " + node1.toString() + " should be inactive",
+        rmContext.getInactiveRMNodes().containsKey(node1.getNodeID()));
+    Assert.assertFalse("Node " + node2.toString() + " should not be inactive",
+        rmContext.getInactiveRMNodes().containsKey(node2.getNodeID()));
+
+    node2.handle(new RMNodeEvent(node1.getNodeID(), RMNodeEventType.EXPIRE));
+    Assert.assertEquals("Active Nodes", initialActive - 2, cm.getNumActiveNMs());
+    Assert.assertEquals("Lost Nodes", initialLost + 2, cm.getNumLostNMs());
+    Assert.assertEquals("Unhealthy Nodes", initialUnhealthy,
+        cm.getUnhealthyNMs());
+    Assert.assertEquals("Decommissioned Nodes", initialDecommissioned,
+        cm.getNumDecommisionedNMs());
+    Assert.assertEquals("Rebooted Nodes", initialRebooted,
+        cm.getNumRebootedNMs());
+    Assert.assertEquals(NodeState.LOST, node2.getState());
+    Assert.assertTrue("Node " + node1.toString() + " should be inactive",
+        rmContext.getInactiveRMNodes().containsKey(node1.getNodeID()));
+    Assert.assertTrue("Node " + node2.toString() + " should be inactive",
+        rmContext.getInactiveRMNodes().containsKey(node2.getNodeID()));
+  }
+
   @Test
   public void testUnhealthyExpire() {
     RMNodeImpl node = getUnhealthyNode();
@@ -458,14 +499,18 @@ public class TestRMNodeTransitions {
   }
 
   private RMNodeImpl getRunningNode() {
-    return getRunningNode(null);
+    return getRunningNode(null, 0);
   }
 
   private RMNodeImpl getRunningNode(String nmVersion) {
-    NodeId nodeId = BuilderUtils.newNodeId("localhost", 0);
+    return getRunningNode(nmVersion, 0);
+  }
+
+  private RMNodeImpl getRunningNode(String nmVersion, int port) {
+    NodeId nodeId = BuilderUtils.newNodeId("localhost", port);
     Resource capability = Resource.newInstance(4096, 4);
-    RMNodeImpl node = new RMNodeImpl(nodeId, rmContext,null, 0, 0,
-        null, capability, nmVersion);
+    RMNodeImpl node = new RMNodeImpl(nodeId, rmContext, null, 0, 0, null,
+        capability, nmVersion);
     node.handle(new RMNodeStartedEvent(node.getNodeID(), null, null));
     Assert.assertEquals(NodeState.RUNNING, node.getState());
     return node;

+ 3 - 3
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebApp.java

@@ -173,10 +173,10 @@ public class TestRMWebApp {
     
     final List<RMNode> deactivatedNodes =
         MockNodes.deactivatedNodes(racks, numNodes, newResource(mbsPerNode));
-    final ConcurrentMap<String, RMNode> deactivatedNodesMap =
+    final ConcurrentMap<NodeId, RMNode> deactivatedNodesMap =
         Maps.newConcurrentMap();
     for (RMNode node : deactivatedNodes) {
-      deactivatedNodesMap.put(node.getHostName(), node);
+      deactivatedNodesMap.put(node.getNodeID(), node);
     }
 
     RMContextImpl rmContext = new RMContextImpl(null, null, null, null,
@@ -186,7 +186,7 @@ public class TestRMWebApp {
          return applicationsMaps;
        }
        @Override
-       public ConcurrentMap<String, RMNode> getInactiveRMNodes() {
+       public ConcurrentMap<NodeId, RMNode> getInactiveRMNodes() {
          return deactivatedNodesMap;
        }
        @Override

+ 6 - 3
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesNodes.java

@@ -32,6 +32,7 @@ import javax.xml.parsers.DocumentBuilderFactory;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.NodeState;
 import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
 import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
@@ -263,8 +264,9 @@ public class TestRMWebServicesNodes extends JerseyTestBase {
     assertEquals("incorrect number of elements", 2, nodeArray.length());
     for (int i = 0; i < nodeArray.length(); ++i) {
       JSONObject info = nodeArray.getJSONObject(i);
-      String host = info.get("id").toString().split(":")[0];
-      RMNode rmNode = rm.getRMContext().getInactiveRMNodes().get(host);
+      String[] node = info.get("id").toString().split(":");
+      NodeId nodeId = NodeId.newInstance(node[0], Integer.parseInt(node[1]));
+      RMNode rmNode = rm.getRMContext().getInactiveRMNodes().get(nodeId);
       WebServicesTestUtils.checkStringMatch("nodeHTTPAddress", "",
           info.getString("nodeHTTPAddress"));
       WebServicesTestUtils.checkStringMatch("state", rmNode.getState()
@@ -295,7 +297,8 @@ public class TestRMWebServicesNodes extends JerseyTestBase {
 
     assertEquals("Incorrect Node Information.", "h2:1234", id);
 
-    RMNode rmNode = rm.getRMContext().getInactiveRMNodes().get("h2");
+    NodeId nodeId = NodeId.newInstance("h2", 1234);
+    RMNode rmNode = rm.getRMContext().getInactiveRMNodes().get(nodeId);
     WebServicesTestUtils.checkStringMatch("nodeHTTPAddress", "",
         info.getString("nodeHTTPAddress"));
     WebServicesTestUtils.checkStringMatch("state",