Browse Source

YARN=5181. ClusterNodeTracker: add method to get list of nodes matching a specific resourceName. (kasha via asuresh)

Arun Suresh 8 năm trước cách đây
mục cha
commit
e905a42a2c

+ 45 - 11
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ClusterNodeTracker.java

@@ -18,11 +18,13 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
 
+import com.google.common.base.Preconditions;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
 import org.apache.hadoop.yarn.util.resource.Resources;
 
@@ -50,7 +52,8 @@ public class ClusterNodeTracker<N extends SchedulerNode> {
   private Lock writeLock = readWriteLock.writeLock();
 
   private HashMap<NodeId, N> nodes = new HashMap<>();
-  private Map<String, Integer> nodesPerRack = new HashMap<>();
+  private Map<String, N> nodeNameToNodeMap = new HashMap<>();
+  private Map<String, List<N>> nodesPerRack = new HashMap<>();
 
   private Resource clusterCapacity = Resources.clone(Resources.none());
   private Resource staleClusterCapacity = null;
@@ -66,14 +69,16 @@ public class ClusterNodeTracker<N extends SchedulerNode> {
     writeLock.lock();
     try {
       nodes.put(node.getNodeID(), node);
+      nodeNameToNodeMap.put(node.getNodeName(), node);
 
       // Update nodes per rack as well
       String rackName = node.getRackName();
-      Integer numNodes = nodesPerRack.get(rackName);
-      if (numNodes == null) {
-        numNodes = 0;
+      List<N> nodesList = nodesPerRack.get(rackName);
+      if (nodesList == null) {
+        nodesList = new ArrayList<>();
+        nodesPerRack.put(rackName, nodesList);
       }
-      nodesPerRack.put(rackName, ++numNodes);
+      nodesList.add(node);
 
       // Update cluster capacity
       Resources.addTo(clusterCapacity, node.getTotalResource());
@@ -126,8 +131,8 @@ public class ClusterNodeTracker<N extends SchedulerNode> {
     readLock.lock();
     String rName = rackName == null ? "NULL" : rackName;
     try {
-      Integer nodeCount = nodesPerRack.get(rName);
-      return nodeCount == null ? 0 : nodeCount;
+      List<N> nodesList = nodesPerRack.get(rName);
+      return nodesList == null ? 0 : nodesList.size();
     } finally {
       readLock.unlock();
     }
@@ -154,14 +159,18 @@ public class ClusterNodeTracker<N extends SchedulerNode> {
         LOG.warn("Attempting to remove a non-existent node " + nodeId);
         return null;
       }
+      nodeNameToNodeMap.remove(node.getNodeName());
 
       // Update nodes per rack as well
       String rackName = node.getRackName();
-      Integer numNodes = nodesPerRack.get(rackName);
-      if (numNodes > 0) {
-        nodesPerRack.put(rackName, --numNodes);
-      } else {
+      List<N> nodesList = nodesPerRack.get(rackName);
+      if (nodesList == null) {
         LOG.error("Attempting to remove node from an empty rack " + rackName);
+      } else {
+        nodesList.remove(node);
+        if (nodesList.isEmpty()) {
+          nodesPerRack.remove(rackName);
+        }
       }
 
       // Update cluster capacity
@@ -297,4 +306,29 @@ public class ClusterNodeTracker<N extends SchedulerNode> {
     Collections.sort(sortedList, comparator);
     return sortedList;
   }
+
+  /**
+   * Convenience method to return list of nodes corresponding to resourceName
+   * passed in the {@link ResourceRequest}.
+   *
+   * @param resourceName Host/rack name of the resource, or
+   * {@link ResourceRequest#ANY}
+   * @return list of nodes that match the resourceName
+   */
+  public List<N> getNodesByResourceName(final String resourceName) {
+    Preconditions.checkArgument(
+        resourceName != null && !resourceName.isEmpty());
+    List<N> retNodes = new ArrayList<>();
+    if (ResourceRequest.ANY.equals(resourceName)) {
+      return getAllNodes();
+    } else if (nodeNameToNodeMap.containsKey(resourceName)) {
+      retNodes.add(nodeNameToNodeMap.get(resourceName));
+    } else if (nodesPerRack.containsKey(resourceName)) {
+      return nodesPerRack.get(resourceName);
+    } else {
+      LOG.info(
+          "Could not find a node matching given resourceName " + resourceName);
+    }
+    return retNodes;
+  }
 }

+ 69 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestClusterNodeTracker.java

@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
+
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSSchedulerNode;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Test class to verify ClusterNodeTracker. Using FSSchedulerNode without
+ * loss of generality.
+ */
+public class TestClusterNodeTracker {
+  private ClusterNodeTracker<FSSchedulerNode> nodeTracker =
+      new ClusterNodeTracker<>();
+
+  @Before
+  public void setup() {
+    List<RMNode> rmNodes =
+        MockNodes.newNodes(2, 4, Resource.newInstance(4096, 4));
+    for (RMNode rmNode : rmNodes) {
+      nodeTracker.addNode(new FSSchedulerNode(rmNode, false));
+    }
+  }
+
+  @Test
+  public void testGetNodeCount() {
+    assertEquals("Incorrect number of nodes in the cluster",
+        8, nodeTracker.nodeCount());
+
+    assertEquals("Incorrect number of nodes in each rack",
+        4, nodeTracker.nodeCount("rack0"));
+  }
+
+  @Test
+  public void testGetNodesForResourceName() throws Exception {
+    assertEquals("Incorrect number of nodes matching ANY",
+        8, nodeTracker.getNodesByResourceName(ResourceRequest.ANY).size());
+
+    assertEquals("Incorrect number of nodes matching rack",
+        4, nodeTracker.getNodesByResourceName("rack0").size());
+
+    assertEquals("Incorrect number of nodes matching node",
+        1, nodeTracker.getNodesByResourceName("host0").size());
+  }
+}