浏览代码

YARN-2920. Changed CapacityScheduler to kill containers on nodes where node labels are changed. Contributed by Wangda Tan
(cherry picked from commit fdf042dfffa4d2474e3cac86cfb8fe9ee4648beb)

Jian He 10 年之前
父节点
当前提交
411836b74c
共有 17 个文件被更改,包括 410 次插入37 次删除
  1. 2 1
      hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java
  2. 2 1
      hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java
  3. 3 0
      hadoop-yarn-project/CHANGES.txt
  4. 1 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
  5. 23 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/RMNodeLabelsManager.java
  6. 4 2
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
  7. 22 2
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java
  8. 5 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
  9. 8 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java
  10. 70 6
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
  11. 14 12
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
  12. 14 10
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
  13. 9 2
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java
  14. 37 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/NodeLabelsUpdateSchedulerEvent.java
  15. 1 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEventType.java
  16. 2 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java
  17. 193 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerNodeLabelUpdate.java

+ 2 - 1
hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java

@@ -34,6 +34,7 @@ import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.NodeState;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode
         .UpdatedContainerInfo;
@@ -162,7 +163,7 @@ public class NodeInfo {
 
     @Override
     public Set<String> getNodeLabels() {
-      return null;
+      return RMNodeLabelsManager.EMPTY_STRING_SET;
     }
   }
 

+ 2 - 1
hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java

@@ -27,6 +27,7 @@ import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.NodeState;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode
         .UpdatedContainerInfo;
@@ -150,6 +151,6 @@ public class RMNodeWrapper implements RMNode {
 
   @Override
   public Set<String> getNodeLabels() {
-    return null;
+    return RMNodeLabelsManager.EMPTY_STRING_SET;
   }
 }

+ 3 - 0
hadoop-yarn-project/CHANGES.txt

@@ -231,6 +231,9 @@ Release 2.7.0 - UNRELEASED
 
     YARN-2939. Fix new findbugs warnings in hadoop-yarn-common. (Li Lu via junping_du)
 
+    YARN-2920. Changed CapacityScheduler to kill containers on nodes where
+    node labels are changed. (Wangda Tan via jianhe)
+
 Release 2.6.0 - 2014-11-18
 
   INCOMPATIBLE CHANGES

+ 1 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java

@@ -426,6 +426,7 @@ public class ResourceManager extends CompositeService implements Recoverable {
       rmContext.setAMFinishingMonitor(amFinishingMonitor);
       
       RMNodeLabelsManager nlm = createNodeLabelManager();
+      nlm.setRMContext(rmContext);
       addService(nlm);
       rmContext.setNodeLabelManager(nlm);
 

+ 23 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/RMNodeLabelsManager.java

@@ -35,7 +35,10 @@ import org.apache.hadoop.security.authorize.AccessControlList;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.Dispatcher;
 import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeLabelsUpdateSchedulerEvent;
 import org.apache.hadoop.yarn.util.resource.Resources;
 
 import com.google.common.collect.ImmutableSet;
@@ -57,6 +60,8 @@ public class RMNodeLabelsManager extends CommonNodeLabelsManager {
       new ConcurrentHashMap<String, Queue>();
   protected AccessControlList adminAcl;
   
+  private RMContext rmContext = null;
+  
   @Override
   protected void serviceInit(Configuration conf) throws Exception {
     super.serviceInit(conf);
@@ -331,6 +336,7 @@ public class RMNodeLabelsManager extends CommonNodeLabelsManager {
     return map;
   }
 
+  @SuppressWarnings("unchecked")
   private void updateResourceMappings(Map<String, Host> before,
       Map<String, Host> after) {
     // Get NMs in before only
@@ -341,6 +347,10 @@ public class RMNodeLabelsManager extends CommonNodeLabelsManager {
     for (Entry<String, Host> entry : after.entrySet()) {
       allNMs.addAll(entry.getValue().nms.keySet());
     }
+    
+    // Map used to notify RM
+    Map<NodeId, Set<String>> newNodeToLabelsMap =
+        new HashMap<NodeId, Set<String>>();
 
     // traverse all nms
     for (NodeId nodeId : allNMs) {
@@ -379,6 +389,9 @@ public class RMNodeLabelsManager extends CommonNodeLabelsManager {
       Node newNM;
       if ((newNM = getNMInNodeSet(nodeId, after, true)) != null) {
         Set<String> newLabels = getLabelsByNode(nodeId, after);
+        
+        newNodeToLabelsMap.put(nodeId, ImmutableSet.copyOf(newLabels));
+        
         // no label in the past
         if (newLabels.isEmpty()) {
           // update labels
@@ -405,6 +418,12 @@ public class RMNodeLabelsManager extends CommonNodeLabelsManager {
         }
       }
     }
+    
+    // Notify RM
+    if (rmContext != null && rmContext.getDispatcher() != null) {
+      rmContext.getDispatcher().getEventHandler().handle(
+          new NodeLabelsUpdateSchedulerEvent(newNodeToLabelsMap));
+    }
   }
   
   public Resource getResourceByLabel(String label, Resource clusterResource) {
@@ -452,4 +471,8 @@ public class RMNodeLabelsManager extends CommonNodeLabelsManager {
     }
     return false;
   }
+  
+  public void setRMContext(RMContext rmContext) {
+    this.rmContext = rmContext;
+  }
 }

+ 4 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java

@@ -54,6 +54,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.ClusterMetrics;
 import org.apache.hadoop.yarn.server.resourcemanager.NodesListManagerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.NodesListManagerEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRunningOnNodeEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
@@ -858,9 +859,10 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
 
   @Override
   public Set<String> getNodeLabels() {
-    if (context.getNodeLabelManager() == null) {
+    RMNodeLabelsManager nlm = context.getNodeLabelManager();
+    if (nlm == null || nlm.getLabelsOnNode(nodeId) == null) {
       return CommonNodeLabelsManager.EMPTY_STRING_SET;
     }
-    return context.getNodeLabelManager().getLabelsOnNode(nodeId);
+    return nlm.getLabelsOnNode(nodeId);
   }
  }

+ 22 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java

@@ -22,6 +22,7 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -33,11 +34,14 @@ import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import org.apache.hadoop.yarn.util.resource.Resources;
 
+import com.google.common.collect.ImmutableSet;
+
 
 /**
  * Represents a YARN Cluster Node from the viewpoint of the scheduler.
@@ -61,8 +65,11 @@ public abstract class SchedulerNode {
 
   private final RMNode rmNode;
   private final String nodeName;
-
-  public SchedulerNode(RMNode node, boolean usePortForNodeName) {
+  
+  private volatile Set<String> labels = null;
+  
+  public SchedulerNode(RMNode node, boolean usePortForNodeName,
+      Set<String> labels) {
     this.rmNode = node;
     this.availableResource = Resources.clone(node.getTotalCapability());
     this.totalResourceCapability = Resources.clone(node.getTotalCapability());
@@ -71,6 +78,11 @@ public abstract class SchedulerNode {
     } else {
       nodeName = rmNode.getHostName();
     }
+    this.labels = ImmutableSet.copyOf(labels);
+  }
+
+  public SchedulerNode(RMNode node, boolean usePortForNodeName) {
+    this(node, usePortForNodeName, CommonNodeLabelsManager.EMPTY_STRING_SET);
   }
 
   public RMNode getRMNode() {
@@ -274,4 +286,12 @@ public abstract class SchedulerNode {
     }
     allocateContainer(rmContainer);
   }
+  
+  public Set<String> getLabels() {
+    return labels;
+  }
+  
+  public void updateLabels(Set<String> labels) {
+    this.labels = labels;
+  }
 }

+ 5 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java

@@ -447,4 +447,9 @@ public abstract class AbstractCSQueue implements CSQueue {
   public Map<QueueACL, AccessControlList> getACLs() {
     return acls;
   }
+  
+  @Private
+  public Resource getUsedResourceByLabel(String nodeLabel) {
+    return usedResourcesByNodeLabels.get(nodeLabel);
+  }
 }

+ 8 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java

@@ -143,6 +143,14 @@ extends org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue {
    */
   public Resource getUsedResources();
   
+  /**
+   * Get the currently utilized resources which allocated at nodes with label
+   * specified
+   * 
+   * @return used resources by the queue and it's children
+   */
+  public Resource getUsedResourceByLabel(String nodeLabel);
+  
   /**
    * Get the current run-state of the queue
    * @return current run-state

+ 70 - 6
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java

@@ -21,12 +21,14 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
 import java.io.IOException;
 import java.io.InputStream;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.Comparator;
 import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Random;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
@@ -47,7 +49,9 @@ import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
 import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerState;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.QueueACL;
@@ -92,6 +96,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptR
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerExpiredSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeLabelsUpdateSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeResourceUpdateSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
@@ -966,6 +971,51 @@ public class CapacityScheduler extends
     updateNodeResource(nm, resourceOption);
     root.updateClusterResource(clusterResource);
   }
+  
+  /**
+   * Process node labels update on a node.
+   * 
+   * TODO: Currently capacity scheduler will kill containers on a node when
+   * labels on the node changed. It is a simply solution to ensure guaranteed
+   * capacity on labels of queues. When YARN-2498 completed, we can let
+   * preemption policy to decide if such containers need to be killed or just
+   * keep them running.
+   */
+  private synchronized void updateLabelsOnNode(NodeId nodeId,
+      Set<String> newLabels) {
+    FiCaSchedulerNode node = nodes.get(nodeId);
+    if (null == node) {
+      return;
+    }
+    
+    // labels is same, we don't need do update
+    if (node.getLabels().size() == newLabels.size()
+        && node.getLabels().containsAll(newLabels)) {
+      return;
+    }
+    
+    // Kill running containers since label is changed
+    for (RMContainer rmContainer : node.getRunningContainers()) {
+      ContainerId containerId = rmContainer.getContainerId();
+      completedContainer(rmContainer, 
+          ContainerStatus.newInstance(containerId,
+              ContainerState.COMPLETE, 
+              String.format(
+                  "Container=%s killed since labels on the node=%s changed",
+                  containerId.toString(), nodeId.toString()),
+              ContainerExitStatus.KILLED_BY_RESOURCEMANAGER),
+          RMContainerEventType.KILL);
+    }
+    
+    // Unreserve container on this node
+    RMContainer reservedContainer = node.getReservedContainer();
+    if (null != reservedContainer) {
+      dropContainerReservation(reservedContainer);
+    }
+    
+    // Update node labels after we've done this
+    node.updateLabels(newLabels);
+  }
 
   private synchronized void allocateContainersToNode(FiCaSchedulerNode node) {
     if (rmContext.isWorkPreservingRecoveryEnabled()
@@ -1049,6 +1099,19 @@ public class CapacityScheduler extends
         nodeResourceUpdatedEvent.getResourceOption());
     }
     break;
+    case NODE_LABELS_UPDATE:
+    {
+      NodeLabelsUpdateSchedulerEvent labelUpdateEvent =
+          (NodeLabelsUpdateSchedulerEvent) event;
+      
+      for (Entry<NodeId, Set<String>> entry : labelUpdateEvent
+          .getUpdatedNodeToLabels().entrySet()) {
+        NodeId id = entry.getKey();
+        Set<String> labels = entry.getValue();
+        updateLabelsOnNode(id, labels);
+      }
+    }
+    break;
     case NODE_UPDATE:
     {
       NodeUpdateSchedulerEvent nodeUpdatedEvent = (NodeUpdateSchedulerEvent)event;
@@ -1117,13 +1180,8 @@ public class CapacityScheduler extends
   }
 
   private synchronized void addNode(RMNode nodeManager) {
-    // update this node to node label manager
-    if (labelManager != null) {
-      labelManager.activateNode(nodeManager.getNodeID(),
-          nodeManager.getTotalCapability());
-    }
     FiCaSchedulerNode schedulerNode = new FiCaSchedulerNode(nodeManager,
-        usePortForNodeName);
+        usePortForNodeName, nodeManager.getNodeLabels());
     this.nodes.put(nodeManager.getNodeID(), schedulerNode);
     Resources.addTo(clusterResource, nodeManager.getTotalCapability());
     root.updateClusterResource(clusterResource);
@@ -1136,6 +1194,12 @@ public class CapacityScheduler extends
     if (scheduleAsynchronously && numNodes == 1) {
       asyncSchedulerThread.beginSchedule();
     }
+    
+    // update this node to node label manager
+    if (labelManager != null) {
+      labelManager.activateNode(nodeManager.getNodeID(),
+          nodeManager.getTotalCapability());
+    }
   }
 
   private synchronized void removeNode(RMNode nodeInfo) {

+ 14 - 12
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java

@@ -730,7 +730,7 @@ public class LeafQueue extends AbstractCSQueue {
     
     // if our queue cannot access this node, just return
     if (!SchedulerUtils.checkQueueAccessToNode(accessibleLabels,
-        labelManager.getLabelsOnNode(node.getNodeID()))) {
+        node.getLabels())) {
       return NULL_ASSIGNMENT;
     }
     
@@ -799,7 +799,7 @@ public class LeafQueue extends AbstractCSQueue {
           
           // Check queue max-capacity limit
           if (!canAssignToThisQueue(clusterResource, required,
-              labelManager.getLabelsOnNode(node.getNodeID()), application, true)) {
+              node.getLabels(), application, true)) {
             return NULL_ASSIGNMENT;
           }
 
@@ -832,7 +832,7 @@ public class LeafQueue extends AbstractCSQueue {
             // Book-keeping 
             // Note: Update headroom to account for current allocation too...
             allocateResource(clusterResource, application, assigned,
-                labelManager.getLabelsOnNode(node.getNodeID()));
+                node.getLabels());
             
             // Don't reset scheduling opportunities for non-local assignments
             // otherwise the app will be delayed for each non-local assignment.
@@ -1478,7 +1478,7 @@ public class LeafQueue extends AbstractCSQueue {
     
     // check if the resource request can access the label
     if (!SchedulerUtils.checkNodeLabelExpression(
-        labelManager.getLabelsOnNode(node.getNodeID()),
+        node.getLabels(),
         request.getNodeLabelExpression())) {
       // this is a reserved container, but we cannot allocate it now according
       // to label not match. This can be caused by node label changed
@@ -1669,8 +1669,7 @@ public class LeafQueue extends AbstractCSQueue {
         // Book-keeping
         if (removed) {
           releaseResource(clusterResource, application,
-              container.getResource(),
-              labelManager.getLabelsOnNode(node.getNodeID()));
+              container.getResource(), node.getLabels());
           LOG.info("completedContainer" +
               " container=" + container +
               " queue=" + this +
@@ -1862,9 +1861,10 @@ public class LeafQueue extends AbstractCSQueue {
     }
     // Careful! Locking order is important! 
     synchronized (this) {
+      FiCaSchedulerNode node =
+          scheduler.getNode(rmContainer.getContainer().getNodeId());
       allocateResource(clusterResource, attempt, rmContainer.getContainer()
-          .getResource(), labelManager.getLabelsOnNode(rmContainer
-          .getContainer().getNodeId()));
+          .getResource(), node.getLabels());
     }
     getParent().recoverContainer(clusterResource, attempt, rmContainer);
   }
@@ -1901,9 +1901,10 @@ public class LeafQueue extends AbstractCSQueue {
   public void attachContainer(Resource clusterResource,
       FiCaSchedulerApp application, RMContainer rmContainer) {
     if (application != null) {
+      FiCaSchedulerNode node =
+          scheduler.getNode(rmContainer.getContainer().getNodeId());
       allocateResource(clusterResource, application, rmContainer.getContainer()
-          .getResource(), labelManager.getLabelsOnNode(rmContainer
-          .getContainer().getNodeId()));
+          .getResource(), node.getLabels());
       LOG.info("movedContainer" + " container=" + rmContainer.getContainer()
           + " resource=" + rmContainer.getContainer().getResource()
           + " queueMoveIn=" + this + " usedCapacity=" + getUsedCapacity()
@@ -1918,9 +1919,10 @@ public class LeafQueue extends AbstractCSQueue {
   public void detachContainer(Resource clusterResource,
       FiCaSchedulerApp application, RMContainer rmContainer) {
     if (application != null) {
+      FiCaSchedulerNode node =
+          scheduler.getNode(rmContainer.getContainer().getNodeId());
       releaseResource(clusterResource, application, rmContainer.getContainer()
-          .getResource(), labelManager.getLabelsOnNode(rmContainer.getContainer()
-          .getNodeId()));
+          .getResource(), node.getLabels());
       LOG.info("movedContainer" + " container=" + rmContainer.getContainer()
           + " resource=" + rmContainer.getContainer().getResource()
           + " queueMoveOut=" + this + " usedCapacity=" + getUsedCapacity()

+ 14 - 10
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java

@@ -73,6 +73,7 @@ public class ParentQueue extends AbstractCSQueue {
   private final boolean rootQueue;
   final Comparator<CSQueue> queueComparator;
   volatile int numApplications;
+  private final CapacitySchedulerContext scheduler;
 
   private final RecordFactory recordFactory = 
     RecordFactoryProvider.getRecordFactory(null);
@@ -80,7 +81,7 @@ public class ParentQueue extends AbstractCSQueue {
   public ParentQueue(CapacitySchedulerContext cs, 
       String queueName, CSQueue parent, CSQueue old) throws IOException {
     super(cs, queueName, parent, old);
-    
+    this.scheduler = cs;
     this.queueComparator = cs.getQueueComparator();
 
     this.rootQueue = (parent == null);
@@ -420,10 +421,10 @@ public class ParentQueue extends AbstractCSQueue {
       Resource clusterResource, FiCaSchedulerNode node, boolean needToUnreserve) {
     CSAssignment assignment = 
         new CSAssignment(Resources.createResource(0, 0), NodeType.NODE_LOCAL);
+    Set<String> nodeLabels = node.getLabels();
     
     // if our queue cannot access this node, just return
-    if (!SchedulerUtils.checkQueueAccessToNode(accessibleLabels,
-        labelManager.getLabelsOnNode(node.getNodeID()))) {
+    if (!SchedulerUtils.checkQueueAccessToNode(accessibleLabels, nodeLabels)) {
       return assignment;
     }
     
@@ -434,7 +435,6 @@ public class ParentQueue extends AbstractCSQueue {
       }
       
       boolean localNeedToUnreserve = false;
-      Set<String> nodeLabels = labelManager.getLabelsOnNode(node.getNodeID()); 
       
       // Are we over maximum-capacity for this queue?
       if (!canAssignToThisQueue(clusterResource, nodeLabels)) {
@@ -641,7 +641,7 @@ public class ParentQueue extends AbstractCSQueue {
       // Book keeping
       synchronized (this) {
         super.releaseResource(clusterResource, rmContainer.getContainer()
-            .getResource(), labelManager.getLabelsOnNode(node.getNodeID()));
+            .getResource(), node.getLabels());
 
         LOG.info("completedContainer" +
             " queue=" + getQueueName() + 
@@ -703,9 +703,10 @@ public class ParentQueue extends AbstractCSQueue {
     }
     // Careful! Locking order is important! 
     synchronized (this) {
+      FiCaSchedulerNode node =
+          scheduler.getNode(rmContainer.getContainer().getNodeId());
       super.allocateResource(clusterResource, rmContainer.getContainer()
-          .getResource(), labelManager.getLabelsOnNode(rmContainer
-          .getContainer().getNodeId()));
+          .getResource(), node.getLabels());
     }
     if (parent != null) {
       parent.recoverContainer(clusterResource, attempt, rmContainer);
@@ -730,9 +731,10 @@ public class ParentQueue extends AbstractCSQueue {
   public void attachContainer(Resource clusterResource,
       FiCaSchedulerApp application, RMContainer rmContainer) {
     if (application != null) {
+      FiCaSchedulerNode node =
+          scheduler.getNode(rmContainer.getContainer().getNodeId());
       super.allocateResource(clusterResource, rmContainer.getContainer()
-          .getResource(), labelManager.getLabelsOnNode(rmContainer
-          .getContainer().getNodeId()));
+          .getResource(), node.getLabels());
       LOG.info("movedContainer" + " queueMoveIn=" + getQueueName()
           + " usedCapacity=" + getUsedCapacity() + " absoluteUsedCapacity="
           + getAbsoluteUsedCapacity() + " used=" + usedResources + " cluster="
@@ -748,9 +750,11 @@ public class ParentQueue extends AbstractCSQueue {
   public void detachContainer(Resource clusterResource,
       FiCaSchedulerApp application, RMContainer rmContainer) {
     if (application != null) {
+      FiCaSchedulerNode node =
+          scheduler.getNode(rmContainer.getContainer().getNodeId());
       super.releaseResource(clusterResource,
           rmContainer.getContainer().getResource(),
-          labelManager.getLabelsOnNode(rmContainer.getContainer().getNodeId()));
+          node.getLabels());
       LOG.info("movedContainer" + " queueMoveOut=" + getQueueName()
           + " usedCapacity=" + getUsedCapacity() + " absoluteUsedCapacity="
           + getAbsoluteUsedCapacity() + " used=" + usedResources + " cluster="

+ 9 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java

@@ -19,12 +19,14 @@
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica;
 
 
+import java.util.Set;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
-import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
@@ -32,9 +34,14 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
 public class FiCaSchedulerNode extends SchedulerNode {
 
   private static final Log LOG = LogFactory.getLog(FiCaSchedulerNode.class);
+  
+  public FiCaSchedulerNode(RMNode node, boolean usePortForNodeName,
+      Set<String> nodeLabels) {
+    super(node, usePortForNodeName, nodeLabels);
+  }
 
   public FiCaSchedulerNode(RMNode node, boolean usePortForNodeName) {
-    super(node, usePortForNodeName);
+    this(node, usePortForNodeName, CommonNodeLabelsManager.EMPTY_STRING_SET);
   }
 
   @Override

+ 37 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/NodeLabelsUpdateSchedulerEvent.java

@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.event;
+
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.hadoop.yarn.api.records.NodeId;
+
+public class NodeLabelsUpdateSchedulerEvent extends SchedulerEvent {
+  private Map<NodeId, Set<String>> nodeToLabels;
+
+  public NodeLabelsUpdateSchedulerEvent(Map<NodeId, Set<String>> nodeToLabels) {
+    super(SchedulerEventType.NODE_LABELS_UPDATE);
+    this.nodeToLabels = nodeToLabels;
+  }
+  
+  public Map<NodeId, Set<String>> getUpdatedNodeToLabels() {
+    return nodeToLabels;
+  }
+}

+ 1 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEventType.java

@@ -25,6 +25,7 @@ public enum SchedulerEventType {
   NODE_REMOVED,
   NODE_UPDATE,
   NODE_RESOURCE_UPDATE,
+  NODE_LABELS_UPDATE,
 
   // Source: RMApp
   APP_ADDED,

+ 2 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java

@@ -31,6 +31,7 @@ import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo;
 
@@ -206,7 +207,7 @@ public class MockNodes {
 
     @Override
     public Set<String> getNodeLabels() {
-      return null;
+      return RMNodeLabelsManager.EMPTY_STRING_SET;
     }
   };
 

+ 193 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerNodeLabelUpdate.java

@@ -0,0 +1,193 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
+
+import java.util.ArrayList;
+import java.util.Set;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
+import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
+import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.MemoryRMNodeLabelsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Sets;
+
+public class TestCapacitySchedulerNodeLabelUpdate {
+  private final int GB = 1024;
+
+  private YarnConfiguration conf;
+  
+  RMNodeLabelsManager mgr;
+
+  @Before
+  public void setUp() throws Exception {
+    conf = new YarnConfiguration();
+    conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
+      ResourceScheduler.class);
+    mgr = new MemoryRMNodeLabelsManager();
+    mgr.init(conf);
+  }
+  
+  private Configuration getConfigurationWithQueueLabels(Configuration config) {
+    CapacitySchedulerConfiguration conf =
+        new CapacitySchedulerConfiguration(config);
+    
+    // Define top-level queues
+    conf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {"a"});
+    conf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "x", 100);
+    conf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "y", 100);
+    conf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "z", 100);
+
+    final String A = CapacitySchedulerConfiguration.ROOT + ".a";
+    conf.setCapacity(A, 100);
+    conf.setAccessibleNodeLabels(A, ImmutableSet.of("x", "y", "z"));
+    conf.setCapacityByLabel(A, "x", 100);
+    conf.setCapacityByLabel(A, "y", 100);
+    conf.setCapacityByLabel(A, "z", 100);
+    
+    return conf;
+  }
+  
+  private Set<String> toSet(String... elements) {
+    Set<String> set = Sets.newHashSet(elements);
+    return set;
+  }
+  
+  private void checkUsedResource(MockRM rm, String queueName, int memory) {
+    checkUsedResource(rm, queueName, memory, RMNodeLabelsManager.NO_LABEL);
+  }
+  
+  private void checkUsedResource(MockRM rm, String queueName, int memory,
+      String label) {
+    CapacityScheduler scheduler = (CapacityScheduler) rm.getResourceScheduler();
+    CSQueue queue = scheduler.getQueue(queueName);
+    Assert.assertEquals(memory, queue.getUsedResourceByLabel(label).getMemory());
+  }
+
+  @Test (timeout = 30000)
+  public void testNodeUpdate() throws Exception {
+    // set node -> label
+    mgr.addToCluserNodeLabels(ImmutableSet.of("x", "y", "z"));
+    
+    // set mapping:
+    // h1 -> x
+    // h2 -> y
+    mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("x")));
+    mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h2", 0), toSet("y")));
+
+    // inject node label manager
+    MockRM rm = new MockRM(getConfigurationWithQueueLabels(conf)) {
+      @Override
+      public RMNodeLabelsManager createNodeLabelManager() {
+        return mgr;
+      }
+    };
+
+    rm.getRMContext().setNodeLabelManager(mgr);
+    rm.start();
+    MockNM nm1 = rm.registerNode("h1:1234", 8000);
+    MockNM nm2 = rm.registerNode("h2:1234", 8000);
+    MockNM nm3 = rm.registerNode("h3:1234", 8000);
+    
+    ContainerId containerId;
+
+    // launch an app to queue a1 (label = x), and check all container will
+    // be allocated in h1
+    RMApp app1 = rm.submitApp(GB, "app", "user", null, "a");
+    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm3);
+
+    // request a container.
+    am1.allocate("*", GB, 1, new ArrayList<ContainerId>(), "x");
+    containerId =
+        ContainerId.newContainerId(am1.getApplicationAttemptId(), 2);
+    Assert.assertTrue(rm.waitForState(nm1, containerId,
+        RMContainerState.ALLOCATED, 10 * 1000));
+    
+    // check used resource:
+    // queue-a used x=1G, ""=1G
+    checkUsedResource(rm, "a", 1024, "x");
+    checkUsedResource(rm, "a", 1024);
+    
+    // change h1's label to z, container should be killed
+    mgr.replaceLabelsOnNode(ImmutableMap.of(NodeId.newInstance("h1", 0),
+        toSet("z")));
+    Assert.assertTrue(rm.waitForState(nm1, containerId,
+        RMContainerState.KILLED, 10 * 1000));
+    
+    // check used resource:
+    // queue-a used x=0G, ""=1G ("" not changed)
+    checkUsedResource(rm, "a", 0, "x");
+    checkUsedResource(rm, "a", 1024);
+    
+    // request a container with label = y
+    am1.allocate("*", GB, 1, new ArrayList<ContainerId>(), "y");
+    containerId =
+        ContainerId.newContainerId(am1.getApplicationAttemptId(), 3);
+    Assert.assertTrue(rm.waitForState(nm2, containerId,
+        RMContainerState.ALLOCATED, 10 * 1000));
+    
+    // check used resource:
+    // queue-a used y=1G, ""=1G
+    checkUsedResource(rm, "a", 1024, "y");
+    checkUsedResource(rm, "a", 1024);
+    
+    // change h2's label to no label, container should be killed
+    mgr.replaceLabelsOnNode(ImmutableMap.of(NodeId.newInstance("h2", 0),
+        CommonNodeLabelsManager.EMPTY_STRING_SET));
+    Assert.assertTrue(rm.waitForState(nm1, containerId,
+        RMContainerState.KILLED, 10 * 1000));
+    
+    // check used resource:
+    // queue-a used x=0G, y=0G, ""=1G ("" not changed)
+    checkUsedResource(rm, "a", 0, "x");
+    checkUsedResource(rm, "a", 0, "y");
+    checkUsedResource(rm, "a", 1024);
+    
+    containerId =
+        ContainerId.newContainerId(am1.getApplicationAttemptId(), 1);
+    
+    // change h3's label to z, AM container should be killed
+    mgr.replaceLabelsOnNode(ImmutableMap.of(NodeId.newInstance("h3", 0),
+        toSet("z")));
+    Assert.assertTrue(rm.waitForState(nm1, containerId,
+        RMContainerState.KILLED, 10 * 1000));
+    
+    // check used resource:
+    // queue-a used x=0G, y=0G, ""=1G ("" not changed)
+    checkUsedResource(rm, "a", 0, "x");
+    checkUsedResource(rm, "a", 0, "y");
+    checkUsedResource(rm, "a", 0);
+
+    rm.close();
+  }
+}