Kaynağa Gözat

YARN-2699. Fixed a bug in CommonNodeLabelsManager that caused tests to fail when using ephemeral ports on NodeIDs. Contributed by Wangda Tan.

(cherry picked from commit abae63caf9c53b404f2f2db7d482555484eaeaf8)
Vinod Kumar Vavilapalli 10 yıl önce
ebeveyn
işleme
99ce4277a8

+ 3 - 0
hadoop-yarn-project/CHANGES.txt

@@ -564,6 +564,9 @@ Release 2.6.0 - UNRELEASED
     tracking per label when a host runs multiple node-managers. (Wangda Tan via
     tracking per label when a host runs multiple node-managers. (Wangda Tan via
     vinodkv)
     vinodkv)
 
 
+    YARN-2699. Fixed a bug in CommonNodeLabelsManager that caused tests to fail
+    when using ephemeral ports on NodeIDs. (Wangda Tan via vinodkv)
+
   BREAKDOWN OF YARN-1051 SUBTASKS AND RELATED JIRAS
   BREAKDOWN OF YARN-1051 SUBTASKS AND RELATED JIRAS
 
 
     YARN-1707. Introduce APIs to add/remove/resize queues in the
     YARN-1707. Introduce APIs to add/remove/resize queues in the

+ 19 - 18
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/nodelabels/CommonNodeLabelsManager.java

@@ -299,14 +299,14 @@ public class CommonNodeLabelsManager extends AbstractService {
     for (Entry<NodeId, Set<String>> entry : addedLabelsToNode.entrySet()) {
     for (Entry<NodeId, Set<String>> entry : addedLabelsToNode.entrySet()) {
       NodeId nodeId = entry.getKey();
       NodeId nodeId = entry.getKey();
       Set<String> labels = entry.getValue();
       Set<String> labels = entry.getValue();
-      
-      createNodeIfNonExisted(entry.getKey());
-      
+ 
+      createHostIfNonExisted(nodeId.getHost());
       if (nodeId.getPort() == WILDCARD_PORT) {
       if (nodeId.getPort() == WILDCARD_PORT) {
         Host host = nodeCollections.get(nodeId.getHost());
         Host host = nodeCollections.get(nodeId.getHost());
         host.labels.addAll(labels);
         host.labels.addAll(labels);
         newNMToLabels.put(nodeId, host.labels);
         newNMToLabels.put(nodeId, host.labels);
       } else {
       } else {
+        createNodeIfNonExisted(nodeId);
         Node nm = getNMInNodeSet(nodeId);
         Node nm = getNMInNodeSet(nodeId);
         if (nm.labels == null) {
         if (nm.labels == null) {
           nm.labels = new HashSet<String>();
           nm.labels = new HashSet<String>();
@@ -534,21 +534,21 @@ public class CommonNodeLabelsManager extends AbstractService {
   
   
   @SuppressWarnings("unchecked")
   @SuppressWarnings("unchecked")
   protected void internalReplaceLabelsOnNode(
   protected void internalReplaceLabelsOnNode(
-      Map<NodeId, Set<String>> replaceLabelsToNode) {
+      Map<NodeId, Set<String>> replaceLabelsToNode) throws IOException {
     // do replace labels to nodes
     // do replace labels to nodes
     Map<NodeId, Set<String>> newNMToLabels = new HashMap<NodeId, Set<String>>();
     Map<NodeId, Set<String>> newNMToLabels = new HashMap<NodeId, Set<String>>();
     for (Entry<NodeId, Set<String>> entry : replaceLabelsToNode.entrySet()) {
     for (Entry<NodeId, Set<String>> entry : replaceLabelsToNode.entrySet()) {
       NodeId nodeId = entry.getKey();
       NodeId nodeId = entry.getKey();
       Set<String> labels = entry.getValue();
       Set<String> labels = entry.getValue();
 
 
-      // update nodeCollections
-      createNodeIfNonExisted(entry.getKey());
+      createHostIfNonExisted(nodeId.getHost());      
       if (nodeId.getPort() == WILDCARD_PORT) {
       if (nodeId.getPort() == WILDCARD_PORT) {
         Host host = nodeCollections.get(nodeId.getHost());
         Host host = nodeCollections.get(nodeId.getHost());
         host.labels.clear();
         host.labels.clear();
         host.labels.addAll(labels);
         host.labels.addAll(labels);
         newNMToLabels.put(nodeId, host.labels);
         newNMToLabels.put(nodeId, host.labels);
       } else {
       } else {
+        createNodeIfNonExisted(nodeId);
         Node nm = getNMInNodeSet(nodeId);
         Node nm = getNMInNodeSet(nodeId);
         if (nm.labels == null) {
         if (nm.labels == null) {
           nm.labels = new HashSet<String>();
           nm.labels = new HashSet<String>();
@@ -672,10 +672,6 @@ public class CommonNodeLabelsManager extends AbstractService {
 
 
   protected Node getNMInNodeSet(NodeId nodeId, Map<String, Host> map,
   protected Node getNMInNodeSet(NodeId nodeId, Map<String, Host> map,
       boolean checkRunning) {
       boolean checkRunning) {
-    if (WILDCARD_PORT == nodeId.getPort()) {
-      return null;
-    }
-    
     Host host = map.get(nodeId.getHost());
     Host host = map.get(nodeId.getHost());
     if (null == host) {
     if (null == host) {
       return null;
       return null;
@@ -707,17 +703,22 @@ public class CommonNodeLabelsManager extends AbstractService {
     }
     }
   }
   }
   
   
-  protected void createNodeIfNonExisted(NodeId nodeId) {
+  protected void createNodeIfNonExisted(NodeId nodeId) throws IOException {
     Host host = nodeCollections.get(nodeId.getHost());
     Host host = nodeCollections.get(nodeId.getHost());
     if (null == host) {
     if (null == host) {
-      host = new Host();
-      nodeCollections.put(nodeId.getHost(), host);
+      throw new IOException("Should create host before creating node.");
     }
     }
-    if (nodeId.getPort() != WILDCARD_PORT) {
-      Node nm = host.nms.get(nodeId);
-      if (null == nm) {
-        host.nms.put(nodeId, new Node());
-      }
+    Node nm = host.nms.get(nodeId);
+    if (null == nm) {
+      host.nms.put(nodeId, new Node());
+    }
+  }
+  
+  protected void createHostIfNonExisted(String hostName) {
+    Host host = nodeCollections.get(hostName);
+    if (null == host) {
+      host = new Host();
+      nodeCollections.put(hostName, host);
     }
     }
   }
   }
 }
 }

+ 10 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/RMNodeLabelsManager.java

@@ -181,7 +181,15 @@ public class RMNodeLabelsManager extends CommonNodeLabelsManager {
       // save if we have a node before
       // save if we have a node before
       Map<String, Host> before = cloneNodeMap(ImmutableSet.of(nodeId));
       Map<String, Host> before = cloneNodeMap(ImmutableSet.of(nodeId));
       
       
-      createNodeIfNonExisted(nodeId);
+      createHostIfNonExisted(nodeId.getHost());
+      try {
+        createNodeIfNonExisted(nodeId);
+      } catch (IOException e) {
+        LOG.error("This shouldn't happen, cannot get host in nodeCollection"
+            + " associated to the node being activated");
+        return;
+      }
+
       Node nm = getNMInNodeSet(nodeId);
       Node nm = getNMInNodeSet(nodeId);
       nm.resource = resource;
       nm.resource = resource;
       nm.running = true;
       nm.running = true;
@@ -220,7 +228,7 @@ public class RMNodeLabelsManager extends CommonNodeLabelsManager {
     }
     }
   }
   }
 
 
-  public void updateNodeResource(NodeId node, Resource newResource) {
+  public void updateNodeResource(NodeId node, Resource newResource) throws IOException {
     deactivateNode(node);
     deactivateNode(node);
     activateNode(node, newResource);
     activateNode(node, newResource);
   }
   }

+ 8 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/TestRMNodeLabelsManager.java

@@ -96,6 +96,14 @@ public class TestRMNodeLabelsManager extends NodeLabelTestBase {
     Assert.assertEquals(mgr.getResourceByLabel(RMNodeLabelsManager.NO_LABEL, null),
     Assert.assertEquals(mgr.getResourceByLabel(RMNodeLabelsManager.NO_LABEL, null),
         Resources.add(SMALL_RESOURCE, LARGE_NODE));
         Resources.add(SMALL_RESOURCE, LARGE_NODE));
   }
   }
+  
+  @Test(timeout = 5000)
+  public void testActivateNodeManagerWithZeroPort() throws Exception {
+    // active two NM, one is zero port , another is non-zero port. no exception
+    // should be raised
+    mgr.activateNode(NodeId.newInstance("n1", 0), SMALL_RESOURCE);
+    mgr.activateNode(NodeId.newInstance("n1", 2), LARGE_NODE);
+  }
 
 
   @SuppressWarnings({ "unchecked", "rawtypes" })
   @SuppressWarnings({ "unchecked", "rawtypes" })
   @Test(timeout = 5000)
   @Test(timeout = 5000)