فهرست منبع

YARN-6467. CSQueueMetrics needs to update the current metrics for default partition only. Contributed by Manikandan R.

Naganarasimha 8 سال پیش
والد
کامیت
29c68ccc6f
11فایلهای تغییر یافته به همراه263 افزوده شده و 157 حذف شده
  1. 20 12
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
  2. 136 79
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java
  3. 8 4
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
  4. 26 17
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueMetrics.java
  5. 1 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java
  6. 11 8
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
  7. 8 7
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
  8. 5 3
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
  9. 41 20
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestQueueMetrics.java
  10. 2 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerApplicationAttempt.java
  11. 5 5
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestNodeLabelContainerAllocation.java

+ 20 - 12
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java

@@ -31,7 +31,6 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicLong;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
@@ -403,10 +402,13 @@ public class AppSchedulingInfo {
 
 
     Resource lastRequestCapability =
     Resource lastRequestCapability =
         lastRequest != null ? lastRequest.getCapability() : Resources.none();
         lastRequest != null ? lastRequest.getCapability() : Resources.none();
-    metrics.incrPendingResources(user,
+    metrics.incrPendingResources(request.getNodeLabelExpression(), user,
         request.getNumContainers(), request.getCapability());
         request.getNumContainers(), request.getCapability());
-    metrics.decrPendingResources(user,
-        lastRequestContainers, lastRequestCapability);
+
+    if(lastRequest != null) {
+      metrics.decrPendingResources(lastRequest.getNodeLabelExpression(), user,
+          lastRequestContainers, lastRequestCapability);
+    }
 
 
     // update queue:
     // update queue:
     Resource increasedResource =
     Resource increasedResource =
@@ -587,7 +589,8 @@ public class AppSchedulingInfo {
           + deltaCapacity);
           + deltaCapacity);
     }
     }
     // Set queue metrics
     // Set queue metrics
-    queue.getMetrics().allocateResources(user, deltaCapacity);
+    queue.getMetrics().allocateResources(increaseRequest.getNodePartition(),
+        user, deltaCapacity);
     // remove the increase request from pending increase request map
     // remove the increase request from pending increase request map
     removeIncreaseRequest(nodeId, priority, containerId);
     removeIncreaseRequest(nodeId, priority, containerId);
     // update usage
     // update usage
@@ -607,7 +610,8 @@ public class AppSchedulingInfo {
     }
     }
     
     
     // Set queue metrics
     // Set queue metrics
-    queue.getMetrics().releaseResources(user, absDelta);
+    queue.getMetrics().releaseResources(decreaseRequest.getNodePartition(),
+        user, absDelta);
 
 
     // update usage
     // update usage
     appResourceUsage.decUsed(decreaseRequest.getNodePartition(), absDelta);
     appResourceUsage.decUsed(decreaseRequest.getNodePartition(), absDelta);
@@ -644,7 +648,8 @@ public class AppSchedulingInfo {
           + " resource=" + request.getCapability()
           + " resource=" + request.getCapability()
           + " type=" + type);
           + " type=" + type);
     }
     }
-    metrics.allocateResources(user, 1, request.getCapability(), true);
+    metrics.allocateResources(node.getPartition(),
+        user, 1, request.getCapability(), true);
     metrics.incrNodeTypeAggregations(user, type);
     metrics.incrNodeTypeAggregations(user, type);
     return resourceRequests;
     return resourceRequests;
   }
   }
@@ -744,9 +749,11 @@ public class AppSchedulingInfo {
     for (Map<String, ResourceRequest> asks : resourceRequestMap.values()) {
     for (Map<String, ResourceRequest> asks : resourceRequestMap.values()) {
       ResourceRequest request = asks.get(ResourceRequest.ANY);
       ResourceRequest request = asks.get(ResourceRequest.ANY);
       if (request != null) {
       if (request != null) {
-        oldMetrics.decrPendingResources(user, request.getNumContainers(),
+        oldMetrics.decrPendingResources(request.getNodeLabelExpression(),
+            user, request.getNumContainers(),
             request.getCapability());
             request.getCapability());
-        newMetrics.incrPendingResources(user, request.getNumContainers(),
+        newMetrics.incrPendingResources(request.getNodeLabelExpression(),
+            user, request.getNumContainers(),
             request.getCapability());
             request.getCapability());
         
         
         Resource delta = Resources.multiply(request.getCapability(),
         Resource delta = Resources.multiply(request.getCapability(),
@@ -770,7 +777,8 @@ public class AppSchedulingInfo {
     for (Map<String, ResourceRequest> asks : resourceRequestMap.values()) {
     for (Map<String, ResourceRequest> asks : resourceRequestMap.values()) {
       ResourceRequest request = asks.get(ResourceRequest.ANY);
       ResourceRequest request = asks.get(ResourceRequest.ANY);
       if (request != null) {
       if (request != null) {
-        metrics.decrPendingResources(user, request.getNumContainers(),
+        metrics.decrPendingResources(request.getNodeLabelExpression(),
+            user, request.getNumContainers(),
             request.getCapability());
             request.getCapability());
         
         
         // Update Queue
         // Update Queue
@@ -820,8 +828,8 @@ public class AppSchedulingInfo {
     if (rmContainer.getState().equals(RMContainerState.COMPLETED)) {
     if (rmContainer.getState().equals(RMContainerState.COMPLETED)) {
       return;
       return;
     }
     }
-
-    metrics.allocateResources(user, 1, rmContainer.getAllocatedResource(),
+    metrics.allocateResources(rmContainer.getNodeLabelExpression(),
+        user, 1, rmContainer.getAllocatedResource(),
       false);
       false);
   }
   }
   
   

+ 136 - 79
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java

@@ -43,6 +43,7 @@ import org.apache.hadoop.metrics2.lib.MutableRate;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.slf4j.Logger;
 import org.slf4j.Logger;
@@ -59,36 +60,45 @@ public class QueueMetrics implements MetricsSource {
   @Metric("# of apps completed") MutableCounterInt appsCompleted;
   @Metric("# of apps completed") MutableCounterInt appsCompleted;
   @Metric("# of apps killed") MutableCounterInt appsKilled;
   @Metric("# of apps killed") MutableCounterInt appsKilled;
   @Metric("# of apps failed") MutableCounterInt appsFailed;
   @Metric("# of apps failed") MutableCounterInt appsFailed;
-
-  @Metric("Allocated memory in MB") MutableGaugeLong allocatedMB;
-  @Metric("Allocated CPU in virtual cores") MutableGaugeInt allocatedVCores;
-  @Metric("# of allocated containers") MutableGaugeInt allocatedContainers;
-  @Metric("Aggregate # of allocated containers") MutableCounterLong aggregateContainersAllocated;
   @Metric("Aggregate # of allocated node-local containers")
   @Metric("Aggregate # of allocated node-local containers")
     MutableCounterLong aggregateNodeLocalContainersAllocated;
     MutableCounterLong aggregateNodeLocalContainersAllocated;
   @Metric("Aggregate # of allocated rack-local containers")
   @Metric("Aggregate # of allocated rack-local containers")
     MutableCounterLong aggregateRackLocalContainersAllocated;
     MutableCounterLong aggregateRackLocalContainersAllocated;
   @Metric("Aggregate # of allocated off-switch containers")
   @Metric("Aggregate # of allocated off-switch containers")
     MutableCounterLong aggregateOffSwitchContainersAllocated;
     MutableCounterLong aggregateOffSwitchContainersAllocated;
-  @Metric("Aggregate # of released containers") MutableCounterLong aggregateContainersReleased;
+  @Metric("Aggregate # of preempted containers") MutableCounterLong
+      aggregateContainersPreempted;
+  @Metric("# of active users") MutableGaugeInt activeUsers;
+  @Metric("# of active applications") MutableGaugeInt activeApplications;
+  @Metric("App Attempt First Container Allocation Delay")
+    MutableRate appAttemptFirstContainerAllocationDelay;
+
+  //Metrics updated only for "default" partition
+  @Metric("Allocated memory in MB") MutableGaugeLong allocatedMB;
+  @Metric("Allocated CPU in virtual cores") MutableGaugeInt allocatedVCores;
+  @Metric("# of allocated containers") MutableGaugeInt allocatedContainers;
+  @Metric("Aggregate # of allocated containers")
+    MutableCounterLong aggregateContainersAllocated;
+  @Metric("Aggregate # of released containers")
+    MutableCounterLong aggregateContainersReleased;
   @Metric("Available memory in MB") MutableGaugeLong availableMB;
   @Metric("Available memory in MB") MutableGaugeLong availableMB;
   @Metric("Available CPU in virtual cores") MutableGaugeInt availableVCores;
   @Metric("Available CPU in virtual cores") MutableGaugeInt availableVCores;
   @Metric("Pending memory allocation in MB") MutableGaugeLong pendingMB;
   @Metric("Pending memory allocation in MB") MutableGaugeLong pendingMB;
-  @Metric("Pending CPU allocation in virtual cores") MutableGaugeInt pendingVCores;
+  @Metric("Pending CPU allocation in virtual cores")
+    MutableGaugeInt pendingVCores;
   @Metric("# of pending containers") MutableGaugeInt pendingContainers;
   @Metric("# of pending containers") MutableGaugeInt pendingContainers;
   @Metric("# of reserved memory in MB") MutableGaugeLong reservedMB;
   @Metric("# of reserved memory in MB") MutableGaugeLong reservedMB;
   @Metric("Reserved CPU in virtual cores") MutableGaugeInt reservedVCores;
   @Metric("Reserved CPU in virtual cores") MutableGaugeInt reservedVCores;
   @Metric("# of reserved containers") MutableGaugeInt reservedContainers;
   @Metric("# of reserved containers") MutableGaugeInt reservedContainers;
-  @Metric("# of active users") MutableGaugeInt activeUsers;
-  @Metric("# of active applications") MutableGaugeInt activeApplications;
-  @Metric("App Attempt First Container Allocation Delay") MutableRate appAttemptFirstContainerAllocationDelay;
+
   private final MutableGaugeInt[] runningTime;
   private final MutableGaugeInt[] runningTime;
   private TimeBucketMetrics<ApplicationId> runBuckets;
   private TimeBucketMetrics<ApplicationId> runBuckets;
 
 
   static final Logger LOG = LoggerFactory.getLogger(QueueMetrics.class);
   static final Logger LOG = LoggerFactory.getLogger(QueueMetrics.class);
   static final MetricsInfo RECORD_INFO = info("QueueMetrics",
   static final MetricsInfo RECORD_INFO = info("QueueMetrics",
       "Metrics for the resource scheduler");
       "Metrics for the resource scheduler");
-  protected static final MetricsInfo QUEUE_INFO = info("Queue", "Metrics by queue");
+  protected static final MetricsInfo QUEUE_INFO =
+      info("Queue", "Metrics by queue");
   protected static final MetricsInfo USER_INFO =
   protected static final MetricsInfo USER_INFO =
       info("User", "Metrics by user");
       info("User", "Metrics by user");
   static final Splitter Q_SPLITTER =
   static final Splitter Q_SPLITTER =
@@ -320,44 +330,64 @@ public class QueueMetrics implements MetricsSource {
     }
     }
   }
   }
 
 
+  /**
+   * Set available resources. To be called by scheduler periodically as
+   * resources become available.
+   * @param partition Node Partition
+   * @param limit resource limit
+   */
+  public void setAvailableResourcesToQueue(String partition, Resource limit) {
+    if(partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) {
+      availableMB.set(limit.getMemorySize());
+      availableVCores.set(limit.getVirtualCores());
+    }
+  }
+
   /**
   /**
    * Set available resources. To be called by scheduler periodically as
    * Set available resources. To be called by scheduler periodically as
    * resources become available.
    * resources become available.
    * @param limit resource limit
    * @param limit resource limit
    */
    */
   public void setAvailableResourcesToQueue(Resource limit) {
   public void setAvailableResourcesToQueue(Resource limit) {
-    availableMB.set(limit.getMemorySize());
-    availableVCores.set(limit.getVirtualCores());
+    this.setAvailableResourcesToQueue(RMNodeLabelsManager.NO_LABEL, limit);
   }
   }
 
 
   /**
   /**
    * Set available resources. To be called by scheduler periodically as
    * Set available resources. To be called by scheduler periodically as
    * resources become available.
    * resources become available.
+   * @param partition Node Partition
    * @param user
    * @param user
    * @param limit resource limit
    * @param limit resource limit
    */
    */
-  public void setAvailableResourcesToUser(String user, Resource limit) {
-    QueueMetrics userMetrics = getUserMetrics(user);
-    if (userMetrics != null) {
-      userMetrics.setAvailableResourcesToQueue(limit);
+  public void setAvailableResourcesToUser(String partition,
+      String user, Resource limit) {
+    if(partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) {
+      QueueMetrics userMetrics = getUserMetrics(user);
+      if (userMetrics != null) {
+        userMetrics.setAvailableResourcesToQueue(partition, limit);
+      }
     }
     }
   }
   }
 
 
   /**
   /**
    * Increment pending resource metrics
    * Increment pending resource metrics
+   * @param partition Node Partition
    * @param user
    * @param user
    * @param containers
    * @param containers
    * @param res the TOTAL delta of resources note this is different from
    * @param res the TOTAL delta of resources note this is different from
    *            the other APIs which use per container resource
    *            the other APIs which use per container resource
    */
    */
-  public void incrPendingResources(String user, int containers, Resource res) {
-    _incrPendingResources(containers, res);
-    QueueMetrics userMetrics = getUserMetrics(user);
-    if (userMetrics != null) {
-      userMetrics.incrPendingResources(user, containers, res);
-    }
-    if (parent != null) {
-      parent.incrPendingResources(user, containers, res);
+  public void incrPendingResources(String partition, String user,
+      int containers, Resource res) {
+    if(partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) {
+      _incrPendingResources(containers, res);
+      QueueMetrics userMetrics = getUserMetrics(user);
+      if (userMetrics != null) {
+        userMetrics.incrPendingResources(partition, user, containers, res);
+      }
+      if (parent != null) {
+        parent.incrPendingResources(partition, user, containers, res);
+      }
     }
     }
   }
   }
 
 
@@ -367,14 +397,18 @@ public class QueueMetrics implements MetricsSource {
     pendingVCores.incr(res.getVirtualCores() * containers);
     pendingVCores.incr(res.getVirtualCores() * containers);
   }
   }
 
 
-  public void decrPendingResources(String user, int containers, Resource res) {
-    _decrPendingResources(containers, res);
-    QueueMetrics userMetrics = getUserMetrics(user);
-    if (userMetrics != null) {
-      userMetrics.decrPendingResources(user, containers, res);
-    }
-    if (parent != null) {
-      parent.decrPendingResources(user, containers, res);
+
+  public void decrPendingResources(String partition, String user,
+      int containers, Resource res) {
+    if(partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) {
+      _decrPendingResources(containers, res);
+      QueueMetrics userMetrics = getUserMetrics(user);
+      if (userMetrics != null) {
+        userMetrics.decrPendingResources(partition, user, containers, res);
+      }
+      if (parent != null) {
+        parent.decrPendingResources(partition, user, containers, res);
+      }
     }
     }
   }
   }
 
 
@@ -403,76 +437,93 @@ public class QueueMetrics implements MetricsSource {
     }
     }
   }
   }
 
 
-  public void allocateResources(String user, int containers, Resource res,
-      boolean decrPending) {
-    allocatedContainers.incr(containers);
-    aggregateContainersAllocated.incr(containers);
+  public void allocateResources(String partition, String user,
+      int containers, Resource res, boolean decrPending) {
+    if(partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) {
+      allocatedContainers.incr(containers);
+      aggregateContainersAllocated.incr(containers);
 
 
-    allocatedMB.incr(res.getMemorySize() * containers);
-    allocatedVCores.incr(res.getVirtualCores() * containers);
-    if (decrPending) {
-      _decrPendingResources(containers, res);
-    }
-    QueueMetrics userMetrics = getUserMetrics(user);
-    if (userMetrics != null) {
-      userMetrics.allocateResources(user, containers, res, decrPending);
-    }
-    if (parent != null) {
-      parent.allocateResources(user, containers, res, decrPending);
+      allocatedMB.incr(res.getMemorySize() * containers);
+      allocatedVCores.incr(res.getVirtualCores() * containers);
+      if (decrPending) {
+        _decrPendingResources(containers, res);
+      }
+      QueueMetrics userMetrics = getUserMetrics(user);
+      if (userMetrics != null) {
+        userMetrics.allocateResources(partition, user,
+            containers, res, decrPending);
+      }
+      if (parent != null) {
+        parent.allocateResources(partition, user, containers, res, decrPending);
+      }
     }
     }
   }
   }
 
 
   /**
   /**
    * Allocate Resource for container size change.
    * Allocate Resource for container size change.
-   *
+   * @param partition Node Partition
    * @param user
    * @param user
    * @param res
    * @param res
    */
    */
-  public void allocateResources(String user, Resource res) {
-    allocatedMB.incr(res.getMemorySize());
-    allocatedVCores.incr(res.getVirtualCores());
+  public void allocateResources(String partition, String user, Resource res) {
+    if(partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) {
+      allocatedMB.incr(res.getMemorySize());
+      allocatedVCores.incr(res.getVirtualCores());
 
 
-    pendingMB.decr(res.getMemorySize());
-    pendingVCores.decr(res.getVirtualCores());
+      pendingMB.decr(res.getMemorySize());
+      pendingVCores.decr(res.getVirtualCores());
 
 
-    QueueMetrics userMetrics = getUserMetrics(user);
-    if (userMetrics != null) {
-      userMetrics.allocateResources(user, res);
-    }
-    if (parent != null) {
-      parent.allocateResources(user, res);
+      QueueMetrics userMetrics = getUserMetrics(user);
+      if (userMetrics != null) {
+        userMetrics.allocateResources(partition, user, res);
+      }
+      if (parent != null) {
+        parent.allocateResources(partition, user, res);
+      }
     }
     }
   }
   }
 
 
-  public void releaseResources(String user, int containers, Resource res) {
-    allocatedContainers.decr(containers);
-    aggregateContainersReleased.incr(containers);
-    allocatedMB.decr(res.getMemorySize() * containers);
-    allocatedVCores.decr(res.getVirtualCores() * containers);
-    QueueMetrics userMetrics = getUserMetrics(user);
-    if (userMetrics != null) {
-      userMetrics.releaseResources(user, containers, res);
-    }
-    if (parent != null) {
-      parent.releaseResources(user, containers, res);
+  public void releaseResources(String partition,
+      String user, int containers, Resource res) {
+    if(partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) {
+      allocatedContainers.decr(containers);
+      aggregateContainersReleased.incr(containers);
+      allocatedMB.decr(res.getMemorySize() * containers);
+      allocatedVCores.decr(res.getVirtualCores() * containers);
+      QueueMetrics userMetrics = getUserMetrics(user);
+      if (userMetrics != null) {
+        userMetrics.releaseResources(partition, user, containers, res);
+      }
+      if (parent != null) {
+        parent.releaseResources(partition, user, containers, res);
+      }
     }
     }
   }
   }
 
 
   /**
   /**
    * Release Resource for container size change.
    * Release Resource for container size change.
    *
    *
+   * @param partition Node Partition
    * @param user
    * @param user
    * @param res
    * @param res
    */
    */
-  public void releaseResources(String user, Resource res) {
-    allocatedMB.decr(res.getMemorySize());
-    allocatedVCores.decr(res.getVirtualCores());
-    QueueMetrics userMetrics = getUserMetrics(user);
-    if (userMetrics != null) {
-      userMetrics.releaseResources(user, res);
+  public void releaseResources(String partition, String user, Resource res) {
+    if(partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) {
+      allocatedMB.decr(res.getMemorySize());
+      allocatedVCores.decr(res.getVirtualCores());
+      QueueMetrics userMetrics = getUserMetrics(user);
+      if (userMetrics != null) {
+        userMetrics.releaseResources(partition, user, res);
+      }
+      if (parent != null) {
+        parent.releaseResources(partition, user, res);
+      }
     }
     }
-    if (parent != null) {
-      parent.releaseResources(user, res);
+  }
+
+  public void reserveResource(String partition, String user, Resource res) {
+    if(partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) {
+      reserveResource(user, res);
     }
     }
   }
   }
 
 
@@ -502,6 +553,12 @@ public class QueueMetrics implements MetricsSource {
     }
     }
   }
   }
 
 
+  public void unreserveResource(String partition, String user, Resource res) {
+    if(partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) {
+      unreserveResource(user, res);
+    }
+  }
+
   public void incrActiveUsers() {
   public void incrActiveUsers() {
     activeUsers.incr();
     activeUsers.incr();
   }
   }

+ 8 - 4
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java

@@ -756,15 +756,19 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
     for (RMContainer liveContainer : liveContainers.values()) {
     for (RMContainer liveContainer : liveContainers.values()) {
       Resource resource = liveContainer.getContainer().getResource();
       Resource resource = liveContainer.getContainer().getResource();
       ((RMContainerImpl)liveContainer).setQueueName(newQueueName);
       ((RMContainerImpl)liveContainer).setQueueName(newQueueName);
-      oldMetrics.releaseResources(user, 1, resource);
-      newMetrics.allocateResources(user, 1, resource, false);
+      oldMetrics.releaseResources(liveContainer.getNodeLabelExpression(),
+          user, 1, resource);
+      newMetrics.allocateResources(liveContainer.getNodeLabelExpression(),
+          user, 1, resource, false);
     }
     }
     for (Map<NodeId, RMContainer> map : reservedContainers.values()) {
     for (Map<NodeId, RMContainer> map : reservedContainers.values()) {
       for (RMContainer reservedContainer : map.values()) {
       for (RMContainer reservedContainer : map.values()) {
         ((RMContainerImpl)reservedContainer).setQueueName(newQueueName);
         ((RMContainerImpl)reservedContainer).setQueueName(newQueueName);
         Resource resource = reservedContainer.getReservedResource();
         Resource resource = reservedContainer.getReservedResource();
-        oldMetrics.unreserveResource(user, resource);
-        newMetrics.reserveResource(user, resource);
+        oldMetrics.unreserveResource(reservedContainer.getNodeLabelExpression(),
+            user, resource);
+        newMetrics.reserveResource(reservedContainer.getNodeLabelExpression(),
+            user, resource);
       }
       }
     }
     }
 
 

+ 26 - 17
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueMetrics.java

@@ -25,12 +25,14 @@ import org.apache.hadoop.metrics2.annotation.Metrics;
 import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
 import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
 import org.apache.hadoop.metrics2.lib.MutableGaugeLong;
 import org.apache.hadoop.metrics2.lib.MutableGaugeLong;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
 
 
 @Metrics(context = "yarn")
 @Metrics(context = "yarn")
 public class CSQueueMetrics extends QueueMetrics {
 public class CSQueueMetrics extends QueueMetrics {
 
 
+  //Metrics updated only for "default" partition
   @Metric("AM memory limit in MB")
   @Metric("AM memory limit in MB")
   MutableGaugeLong AMResourceLimitMB;
   MutableGaugeLong AMResourceLimitMB;
   @Metric("AM CPU limit in virtual cores")
   @Metric("AM CPU limit in virtual cores")
@@ -61,33 +63,40 @@ public class CSQueueMetrics extends QueueMetrics {
     return usedAMResourceVCores.value();
     return usedAMResourceVCores.value();
   }
   }
 
 
-  public void setAMResouceLimit(Resource res) {
-    AMResourceLimitMB.set(res.getMemorySize());
-    AMResourceLimitVCores.set(res.getVirtualCores());
+  public void setAMResouceLimit(String partition, Resource res) {
+    if(partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) {
+      AMResourceLimitMB.set(res.getMemorySize());
+      AMResourceLimitVCores.set(res.getVirtualCores());
+    }
   }
   }
 
 
-  public void setAMResouceLimitForUser(String user, Resource res) {
+  public void setAMResouceLimitForUser(String partition,
+      String user, Resource res) {
     CSQueueMetrics userMetrics = (CSQueueMetrics) getUserMetrics(user);
     CSQueueMetrics userMetrics = (CSQueueMetrics) getUserMetrics(user);
     if (userMetrics != null) {
     if (userMetrics != null) {
-      userMetrics.setAMResouceLimit(res);
+      userMetrics.setAMResouceLimit(partition, res);
     }
     }
   }
   }
 
 
-  public void incAMUsed(String user, Resource res) {
-    usedAMResourceMB.incr(res.getMemorySize());
-    usedAMResourceVCores.incr(res.getVirtualCores());
-    CSQueueMetrics userMetrics = (CSQueueMetrics) getUserMetrics(user);
-    if (userMetrics != null) {
-      userMetrics.incAMUsed(user, res);
+  public void incAMUsed(String partition, String user, Resource res) {
+    if(partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) {
+      usedAMResourceMB.incr(res.getMemorySize());
+      usedAMResourceVCores.incr(res.getVirtualCores());
+      CSQueueMetrics userMetrics = (CSQueueMetrics) getUserMetrics(user);
+      if (userMetrics != null) {
+        userMetrics.incAMUsed(partition, user, res);
+      }
     }
     }
   }
   }
 
 
-  public void decAMUsed(String user, Resource res) {
-    usedAMResourceMB.decr(res.getMemorySize());
-    usedAMResourceVCores.decr(res.getVirtualCores());
-    CSQueueMetrics userMetrics = (CSQueueMetrics) getUserMetrics(user);
-    if (userMetrics != null) {
-      userMetrics.decAMUsed(user, res);
+  public void decAMUsed(String partition, String user, Resource res) {
+    if(partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) {
+      usedAMResourceMB.decr(res.getMemorySize());
+      usedAMResourceVCores.decr(res.getVirtualCores());
+      CSQueueMetrics userMetrics = (CSQueueMetrics) getUserMetrics(user);
+      if (userMetrics != null) {
+        userMetrics.decAMUsed(partition, user, res);
+      }
     }
     }
   }
   }
 
 

+ 1 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java

@@ -290,7 +290,7 @@ class CSQueueUtils {
 
 
     // Update queue metrics w.r.t node labels. In a generic way, we can
     // Update queue metrics w.r.t node labels. In a generic way, we can
     // calculate available resource from all labels in cluster.
     // calculate available resource from all labels in cluster.
-    childQueue.getMetrics().setAvailableResourcesToQueue(
+    childQueue.getMetrics().setAvailableResourcesToQueue(nodePartition,
         getMaxAvailableResourceToQueue(rc, nlm, childQueue, cluster));
         getMaxAvailableResourceToQueue(rc, nlm, childQueue, cluster));
    }
    }
 }
 }

+ 11 - 8
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java

@@ -630,7 +630,7 @@ public class LeafQueue extends AbstractCSQueue {
         resourceCalculator, queuePartitionUsableResource, amResourcePercent,
         resourceCalculator, queuePartitionUsableResource, amResourcePercent,
         minimumAllocation);
         minimumAllocation);
 
 
-    metrics.setAMResouceLimit(amResouceLimit);
+    metrics.setAMResouceLimit(nodePartition, amResouceLimit);
     queueUsage.setAMLimit(nodePartition, amResouceLimit);
     queueUsage.setAMLimit(nodePartition, amResouceLimit);
     return amResouceLimit;
     return amResouceLimit;
   }
   }
@@ -742,9 +742,10 @@ public class LeafQueue extends AbstractCSQueue {
       user.getResourceUsage().incAMUsed(partitionName,
       user.getResourceUsage().incAMUsed(partitionName,
           application.getAMResource(partitionName));
           application.getAMResource(partitionName));
       user.getResourceUsage().setAMLimit(partitionName, userAMLimit);
       user.getResourceUsage().setAMLimit(partitionName, userAMLimit);
-      metrics.incAMUsed(application.getUser(),
+      metrics.incAMUsed(partitionName, application.getUser(),
           application.getAMResource(partitionName));
           application.getAMResource(partitionName));
-      metrics.setAMResouceLimitForUser(application.getUser(), userAMLimit);
+      metrics.setAMResouceLimitForUser(partitionName,
+          application.getUser(), userAMLimit);
       fsApp.remove();
       fsApp.remove();
       LOG.info("Application " + applicationId + " from user: "
       LOG.info("Application " + applicationId + " from user: "
           + application.getUser() + " activated in queue: " + getQueueName());
           + application.getUser() + " activated in queue: " + getQueueName());
@@ -810,7 +811,8 @@ public class LeafQueue extends AbstractCSQueue {
           application.getAMResource(partitionName));
           application.getAMResource(partitionName));
       user.getResourceUsage().decAMUsed(partitionName,
       user.getResourceUsage().decAMUsed(partitionName,
           application.getAMResource(partitionName));
           application.getAMResource(partitionName));
-      metrics.decAMUsed(application.getUser(), application.getAMResource());
+      metrics.decAMUsed(partitionName,
+          application.getUser(), application.getAMResource());
     }
     }
     applicationAttemptMap.remove(application.getApplicationAttemptId());
     applicationAttemptMap.remove(application.getApplicationAttemptId());
 
 
@@ -1140,7 +1142,7 @@ public class LeafQueue extends AbstractCSQueue {
     
     
     application.setHeadroomProvider(headroomProvider);
     application.setHeadroomProvider(headroomProvider);
 
 
-    metrics.setAvailableResourcesToUser(user, headroom);
+    metrics.setAvailableResourcesToUser(nodePartition, user, headroom);
     
     
     return userLimit;
     return userLimit;
   }
   }
@@ -1508,11 +1510,11 @@ public class LeafQueue extends AbstractCSQueue {
     updateQueueUsageRatio(nodePartition,
     updateQueueUsageRatio(nodePartition,
         user.updateUsageRatio(resourceCalculator, resourceByLabel,
         user.updateUsageRatio(resourceCalculator, resourceByLabel,
             nodePartition));
             nodePartition));
-
     // Note this is a bit unconventional since it gets the object and modifies
     // Note this is a bit unconventional since it gets the object and modifies
     // it here, rather then using set routine
     // it here, rather then using set routine
     Resources.subtractFrom(application.getHeadroom(), resource); // headroom
     Resources.subtractFrom(application.getHeadroom(), resource); // headroom
-    metrics.setAvailableResourcesToUser(userName, application.getHeadroom());
+    metrics.setAvailableResourcesToUser(nodePartition,
+        userName, application.getHeadroom());
     
     
     if (LOG.isDebugEnabled()) {
     if (LOG.isDebugEnabled()) {
       LOG.debug(getQueueName() +
       LOG.debug(getQueueName() +
@@ -1556,7 +1558,8 @@ public class LeafQueue extends AbstractCSQueue {
         user.updateUsageRatio(resourceCalculator, resourceByLabel,
         user.updateUsageRatio(resourceCalculator, resourceByLabel,
             nodePartition));
             nodePartition));
 
 
-    metrics.setAvailableResourcesToUser(userName, application.getHeadroom());
+    metrics.setAvailableResourcesToUser(nodePartition,
+        userName, application.getHeadroom());
 
 
     if (LOG.isDebugEnabled()) {
     if (LOG.isDebugEnabled()) {
       LOG.debug(getQueueName() +
       LOG.debug(getQueueName() +

+ 8 - 7
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java

@@ -24,7 +24,6 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.List;
 import java.util.Map;
 import java.util.Map;
 import java.util.Set;
 import java.util.Set;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
@@ -153,7 +152,6 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
       ContainerStatus containerStatus, RMContainerEventType event,
       ContainerStatus containerStatus, RMContainerEventType event,
       String partition) {
       String partition) {
     ContainerId containerId = rmContainer.getContainerId();
     ContainerId containerId = rmContainer.getContainerId();
-
     // Remove from the list of containers
     // Remove from the list of containers
     if (null == liveContainers.remove(containerId)) {
     if (null == liveContainers.remove(containerId)) {
       return false;
       return false;
@@ -174,7 +172,8 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
     
     
     // Update usage metrics 
     // Update usage metrics 
     Resource containerResource = rmContainer.getContainer().getResource();
     Resource containerResource = rmContainer.getContainer().getResource();
-    queue.getMetrics().releaseResources(getUser(), 1, containerResource);
+    queue.getMetrics().releaseResources(partition,
+        getUser(), 1, containerResource);
     attemptResourceUsage.decUsed(partition, containerResource);
     attemptResourceUsage.decUsed(partition, containerResource);
 
 
     // Clear resource utilization metrics cache.
     // Clear resource utilization metrics cache.
@@ -240,6 +239,7 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
 
 
   public synchronized boolean unreserve(Priority priority,
   public synchronized boolean unreserve(Priority priority,
       FiCaSchedulerNode node, RMContainer rmContainer) {
       FiCaSchedulerNode node, RMContainer rmContainer) {
+
     // Cancel increase request (if it has reserved increase request 
     // Cancel increase request (if it has reserved increase request 
     rmContainer.cancelIncreaseReservation();
     rmContainer.cancelIncreaseReservation();
     
     
@@ -248,8 +248,8 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
       node.unreserveResource(this);
       node.unreserveResource(this);
 
 
       // Update reserved metrics
       // Update reserved metrics
-      queue.getMetrics().unreserveResource(getUser(),
-          rmContainer.getReservedResource());
+      queue.getMetrics().unreserveResource(node.getPartition(),
+          getUser(), rmContainer.getReservedResource());
       queue.decReservedResource(node.getPartition(),
       queue.decReservedResource(node.getPartition(),
           rmContainer.getReservedResource());
           rmContainer.getReservedResource());
       return true;
       return true;
@@ -444,7 +444,8 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
     if (super.reserveIncreasedContainer(node, priority, rmContainer,
     if (super.reserveIncreasedContainer(node, priority, rmContainer,
         reservedResource)) {
         reservedResource)) {
 
 
-      queue.getMetrics().reserveResource(getUser(), reservedResource);
+      queue.getMetrics().reserveResource(node.getPartition(),
+          getUser(), reservedResource);
 
 
       // Update the node
       // Update the node
       node.reserveResource(this, priority, rmContainer);
       node.reserveResource(this, priority, rmContainer);
@@ -460,7 +461,7 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
       FiCaSchedulerNode node, RMContainer rmContainer, Container container) {
       FiCaSchedulerNode node, RMContainer rmContainer, Container container) {
     // Update reserved metrics if this is the first reservation
     // Update reserved metrics if this is the first reservation
     if (rmContainer == null) {
     if (rmContainer == null) {
-      queue.getMetrics().reserveResource(
+      queue.getMetrics().reserveResource(node.getPartition(),
           getUser(), container.getResource());
           getUser(), container.getResource());
     }
     }
 
 

+ 5 - 3
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java

@@ -154,7 +154,8 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
     
     
     // Update usage metrics 
     // Update usage metrics 
     Resource containerResource = rmContainer.getContainer().getResource();
     Resource containerResource = rmContainer.getContainer().getResource();
-    queue.getMetrics().releaseResources(getUser(), 1, containerResource);
+    queue.getMetrics().releaseResources(rmContainer.getNodeLabelExpression(),
+        getUser(), 1, containerResource);
     this.attemptResourceUsage.decUsed(containerResource);
     this.attemptResourceUsage.decUsed(containerResource);
 
 
     // remove from preemption map if it is completed
     // remove from preemption map if it is completed
@@ -491,7 +492,8 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
       LOG.info("Making reservation: node=" + node.getNodeName() +
       LOG.info("Making reservation: node=" + node.getNodeName() +
               " app_id=" + getApplicationId());
               " app_id=" + getApplicationId());
       if (!alreadyReserved) {
       if (!alreadyReserved) {
-        getMetrics().reserveResource(getUser(), container.getResource());
+        getMetrics().reserveResource(node.getPartition(),
+            getUser(), container.getResource());
         RMContainer rmContainer =
         RMContainer rmContainer =
                 super.reserve(node, priority, null, container);
                 super.reserve(node, priority, null, container);
         node.reserveResource(this, priority, rmContainer);
         node.reserveResource(this, priority, rmContainer);
@@ -545,7 +547,7 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
     unreserveInternal(priority, node);
     unreserveInternal(priority, node);
     node.unreserveResource(this);
     node.unreserveResource(this);
     clearReservation(node);
     clearReservation(node);
-    getMetrics().unreserveResource(
+    getMetrics().unreserveResource(node.getPartition(),
         getUser(), rmContainer.getContainer().getResource());
         getUser(), rmContainer.getContainer().getResource());
   }
   }
 
 

+ 41 - 20
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestQueueMetrics.java

@@ -37,6 +37,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
 import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
@@ -72,8 +73,10 @@ public class TestQueueMetrics {
     metrics.submitAppAttempt(user);
     metrics.submitAppAttempt(user);
     checkApps(queueSource, 1, 1, 0, 0, 0, 0, true);
     checkApps(queueSource, 1, 1, 0, 0, 0, 0, true);
 
 
-    metrics.setAvailableResourcesToQueue(Resources.createResource(100*GB, 100));
-    metrics.incrPendingResources(user, 5, Resources.createResource(3*GB, 3));
+    metrics.setAvailableResourcesToQueue(RMNodeLabelsManager.NO_LABEL,
+        Resources.createResource(100*GB, 100));
+    metrics.incrPendingResources(RMNodeLabelsManager.NO_LABEL,
+        user, 5, Resources.createResource(3*GB, 3));
     // Available resources is set externally, as it depends on dynamic
     // Available resources is set externally, as it depends on dynamic
     // configurable cluster/queue resources
     // configurable cluster/queue resources
     checkResources(queueSource, 0, 0, 0, 0, 0, 100*GB, 100, 15*GB, 15, 5, 0, 0, 0);
     checkResources(queueSource, 0, 0, 0, 0, 0, 100*GB, 100, 15*GB, 15, 5, 0, 0, 0);
@@ -81,17 +84,21 @@ public class TestQueueMetrics {
     metrics.runAppAttempt(app.getApplicationId(), user);
     metrics.runAppAttempt(app.getApplicationId(), user);
     checkApps(queueSource, 1, 0, 1, 0, 0, 0, true);
     checkApps(queueSource, 1, 0, 1, 0, 0, 0, true);
 
 
-    metrics.allocateResources(user, 3, Resources.createResource(2*GB, 2), true);
+    metrics.allocateResources(RMNodeLabelsManager.NO_LABEL,
+        user, 3, Resources.createResource(2*GB, 2), true);
     checkResources(queueSource, 6*GB, 6, 3, 3, 0, 100*GB, 100, 9*GB, 9, 2, 0, 0, 0);
     checkResources(queueSource, 6*GB, 6, 3, 3, 0, 100*GB, 100, 9*GB, 9, 2, 0, 0, 0);
 
 
-    metrics.releaseResources(user, 1, Resources.createResource(2*GB, 2));
+    metrics.releaseResources(RMNodeLabelsManager.NO_LABEL,
+        user, 1, Resources.createResource(2*GB, 2));
     checkResources(queueSource, 4*GB, 4, 2, 3, 1, 100*GB, 100, 9*GB, 9, 2, 0, 0, 0);
     checkResources(queueSource, 4*GB, 4, 2, 3, 1, 100*GB, 100, 9*GB, 9, 2, 0, 0, 0);
 
 
-    metrics.incrPendingResources(user, 0, Resources.createResource(2 * GB, 2));
+    metrics.incrPendingResources(RMNodeLabelsManager.NO_LABEL,
+        user, 0, Resources.createResource(2 * GB, 2));
     checkResources(queueSource, 4 * GB, 4, 2, 3, 1, 100 * GB, 100, 9 * GB, 9, 2,
     checkResources(queueSource, 4 * GB, 4, 2, 3, 1, 100 * GB, 100, 9 * GB, 9, 2,
         0, 0, 0);
         0, 0, 0);
 
 
-    metrics.decrPendingResources(user, 0, Resources.createResource(2 * GB, 2));
+    metrics.decrPendingResources(RMNodeLabelsManager.NO_LABEL,
+        user, 0, Resources.createResource(2 * GB, 2));
     checkResources(queueSource, 4 * GB, 4, 2, 3, 1, 100 * GB, 100, 9 * GB, 9, 2,
     checkResources(queueSource, 4 * GB, 4, 2, 3, 1, 100 * GB, 100, 9 * GB, 9, 2,
         0, 0, 0);
         0, 0, 0);
 
 
@@ -177,9 +184,12 @@ public class TestQueueMetrics {
     checkApps(queueSource, 1, 1, 0, 0, 0, 0, true);
     checkApps(queueSource, 1, 1, 0, 0, 0, 0, true);
     checkApps(userSource, 1, 1, 0, 0, 0, 0, true);
     checkApps(userSource, 1, 1, 0, 0, 0, 0, true);
 
 
-    metrics.setAvailableResourcesToQueue(Resources.createResource(100*GB, 100));
-    metrics.setAvailableResourcesToUser(user, Resources.createResource(10*GB, 10));
-    metrics.incrPendingResources(user, 5, Resources.createResource(3*GB, 3));
+    metrics.setAvailableResourcesToQueue(RMNodeLabelsManager.NO_LABEL,
+        Resources.createResource(100*GB, 100));
+    metrics.setAvailableResourcesToUser(RMNodeLabelsManager.NO_LABEL,
+        user, Resources.createResource(10*GB, 10));
+    metrics.incrPendingResources(RMNodeLabelsManager.NO_LABEL,
+        user, 5, Resources.createResource(3*GB, 3));
     // Available resources is set externally, as it depends on dynamic
     // Available resources is set externally, as it depends on dynamic
     // configurable cluster/queue resources
     // configurable cluster/queue resources
     checkResources(queueSource, 0, 0, 0, 0, 0,  100*GB, 100, 15*GB, 15, 5, 0, 0, 0);
     checkResources(queueSource, 0, 0, 0, 0, 0,  100*GB, 100, 15*GB, 15, 5, 0, 0, 0);
@@ -189,11 +199,13 @@ public class TestQueueMetrics {
     checkApps(queueSource, 1, 0, 1, 0, 0, 0, true);
     checkApps(queueSource, 1, 0, 1, 0, 0, 0, true);
     checkApps(userSource, 1, 0, 1, 0, 0, 0, true);
     checkApps(userSource, 1, 0, 1, 0, 0, 0, true);
 
 
-    metrics.allocateResources(user, 3, Resources.createResource(2*GB, 2), true);
+    metrics.allocateResources(RMNodeLabelsManager.NO_LABEL,
+        user, 3, Resources.createResource(2*GB, 2), true);
     checkResources(queueSource, 6*GB, 6, 3, 3, 0, 100*GB, 100, 9*GB, 9, 2, 0, 0, 0);
     checkResources(queueSource, 6*GB, 6, 3, 3, 0, 100*GB, 100, 9*GB, 9, 2, 0, 0, 0);
     checkResources(userSource, 6*GB, 6, 3, 3, 0, 10*GB, 10, 9*GB, 9, 2, 0, 0, 0);
     checkResources(userSource, 6*GB, 6, 3, 3, 0, 10*GB, 10, 9*GB, 9, 2, 0, 0, 0);
 
 
-    metrics.releaseResources(user, 1, Resources.createResource(2*GB, 2));
+    metrics.releaseResources(RMNodeLabelsManager.NO_LABEL,
+        user, 1, Resources.createResource(2*GB, 2));
     checkResources(queueSource, 4*GB, 4, 2, 3, 1, 100*GB, 100, 9*GB, 9, 2, 0, 0, 0);
     checkResources(queueSource, 4*GB, 4, 2, 3, 1, 100*GB, 100, 9*GB, 9, 2, 0, 0, 0);
     checkResources(userSource, 4*GB, 4, 2, 3, 1, 10*GB, 10, 9*GB, 9, 2, 0, 0, 0);
     checkResources(userSource, 4*GB, 4, 2, 3, 1, 10*GB, 10, 9*GB, 9, 2, 0, 0, 0);
 
 
@@ -283,11 +295,16 @@ public class TestQueueMetrics {
     checkApps(userSource, 1, 1, 0, 0, 0, 0, true);
     checkApps(userSource, 1, 1, 0, 0, 0, 0, true);
     checkApps(parentUserSource, 1, 1, 0, 0, 0, 0, true);
     checkApps(parentUserSource, 1, 1, 0, 0, 0, 0, true);
 
 
-    parentMetrics.setAvailableResourcesToQueue(Resources.createResource(100*GB, 100));
-    metrics.setAvailableResourcesToQueue(Resources.createResource(100*GB, 100));
-    parentMetrics.setAvailableResourcesToUser(user, Resources.createResource(10*GB, 10));
-    metrics.setAvailableResourcesToUser(user, Resources.createResource(10*GB, 10));
-    metrics.incrPendingResources(user, 5, Resources.createResource(3*GB, 3));
+    parentMetrics.setAvailableResourcesToQueue(RMNodeLabelsManager.NO_LABEL,
+        Resources.createResource(100*GB, 100));
+    metrics.setAvailableResourcesToQueue(RMNodeLabelsManager.NO_LABEL,
+        Resources.createResource(100*GB, 100));
+    parentMetrics.setAvailableResourcesToUser(RMNodeLabelsManager.NO_LABEL,
+        user, Resources.createResource(10*GB, 10));
+    metrics.setAvailableResourcesToUser(RMNodeLabelsManager.NO_LABEL,
+        user, Resources.createResource(10*GB, 10));
+    metrics.incrPendingResources(RMNodeLabelsManager.NO_LABEL,
+        user, 5, Resources.createResource(3*GB, 3));
     checkResources(queueSource, 0, 0, 0, 0, 0, 100*GB, 100, 15*GB, 15, 5, 0, 0, 0);
     checkResources(queueSource, 0, 0, 0, 0, 0, 100*GB, 100, 15*GB, 15, 5, 0, 0, 0);
     checkResources(parentQueueSource, 0, 0, 0, 0, 0, 100*GB, 100, 15*GB, 15, 5, 0, 0, 0);
     checkResources(parentQueueSource, 0, 0, 0, 0, 0, 100*GB, 100, 15*GB, 15, 5, 0, 0, 0);
     checkResources(userSource, 0, 0, 0, 0, 0, 10*GB, 10, 15*GB, 15, 5, 0, 0, 0);
     checkResources(userSource, 0, 0, 0, 0, 0, 10*GB, 10, 15*GB, 15, 5, 0, 0, 0);
@@ -297,8 +314,10 @@ public class TestQueueMetrics {
     checkApps(queueSource, 1, 0, 1, 0, 0, 0, true);
     checkApps(queueSource, 1, 0, 1, 0, 0, 0, true);
     checkApps(userSource, 1, 0, 1, 0, 0, 0, true);
     checkApps(userSource, 1, 0, 1, 0, 0, 0, true);
 
 
-    metrics.allocateResources(user, 3, Resources.createResource(2*GB, 2), true);
-    metrics.reserveResource(user, Resources.createResource(3*GB, 3));
+    metrics.allocateResources(RMNodeLabelsManager.NO_LABEL,
+        user, 3, Resources.createResource(2*GB, 2), true);
+    metrics.reserveResource(RMNodeLabelsManager.NO_LABEL,
+        user, Resources.createResource(3*GB, 3));
     // Available resources is set externally, as it depends on dynamic
     // Available resources is set externally, as it depends on dynamic
     // configurable cluster/queue resources
     // configurable cluster/queue resources
     checkResources(queueSource, 6*GB, 6, 3, 3, 0, 100*GB, 100, 9*GB, 9, 2, 3*GB, 3, 1);
     checkResources(queueSource, 6*GB, 6, 3, 3, 0, 100*GB, 100, 9*GB, 9, 2, 3*GB, 3, 1);
@@ -306,8 +325,10 @@ public class TestQueueMetrics {
     checkResources(userSource, 6*GB, 6, 3, 3, 0, 10*GB, 10, 9*GB, 9, 2, 3*GB, 3, 1);
     checkResources(userSource, 6*GB, 6, 3, 3, 0, 10*GB, 10, 9*GB, 9, 2, 3*GB, 3, 1);
     checkResources(parentUserSource, 6*GB, 6, 3, 3, 0, 10*GB, 10, 9*GB, 9, 2, 3*GB, 3, 1);
     checkResources(parentUserSource, 6*GB, 6, 3, 3, 0, 10*GB, 10, 9*GB, 9, 2, 3*GB, 3, 1);
 
 
-    metrics.releaseResources(user, 1, Resources.createResource(2*GB, 2));
-    metrics.unreserveResource(user, Resources.createResource(3*GB, 3));
+    metrics.releaseResources(RMNodeLabelsManager.NO_LABEL,
+        user, 1, Resources.createResource(2*GB, 2));
+    metrics.unreserveResource(RMNodeLabelsManager.NO_LABEL,
+          user, Resources.createResource(3*GB, 3));
     checkResources(queueSource, 4*GB, 4, 2, 3, 1, 100*GB, 100, 9*GB, 9, 2, 0, 0, 0);
     checkResources(queueSource, 4*GB, 4, 2, 3, 1, 100*GB, 100, 9*GB, 9, 2, 0, 0, 0);
     checkResources(parentQueueSource, 4*GB, 4, 2, 3, 1, 100*GB, 100, 9*GB, 9, 2, 0, 0, 0);
     checkResources(parentQueueSource, 4*GB, 4, 2, 3, 1, 100*GB, 100, 9*GB, 9, 2, 0, 0, 0);
     checkResources(userSource, 4*GB, 4, 2, 3, 1, 10*GB, 10, 9*GB, 9, 2, 0, 0, 0);
     checkResources(userSource, 4*GB, 4, 2, 3, 1, 10*GB, 10, 9*GB, 9, 2, 0, 0, 0);

+ 2 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerApplicationAttempt.java

@@ -99,7 +99,8 @@ public class TestSchedulerApplicationAttempt {
     Map<NodeId, RMContainer> reservations = new HashMap<NodeId, RMContainer>();
     Map<NodeId, RMContainer> reservations = new HashMap<NodeId, RMContainer>();
     reservations.put(node.getNodeID(), container2);
     reservations.put(node.getNodeID(), container2);
     app.reservedContainers.put(prio1, reservations);
     app.reservedContainers.put(prio1, reservations);
-    oldMetrics.reserveResource(user, reservedResource);
+    oldMetrics.reserveResource(container2.getNodeLabelExpression(),
+        user, reservedResource);
     
     
     checkQueueMetrics(oldMetrics, 1, 1, 1536, 2, 2048, 3, 3072, 4);
     checkQueueMetrics(oldMetrics, 1, 1, 1536, 2, 2048, 3, 3072, 4);
     checkQueueMetrics(newMetrics, 0, 0, 0, 0, 0, 0, 0, 0);
     checkQueueMetrics(newMetrics, 0, 0, 0, 0, 0, 0, 0, 0);

+ 5 - 5
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestNodeLabelContainerAllocation.java

@@ -1848,8 +1848,8 @@ public class TestNodeLabelContainerAllocation {
         reportNm2.getAvailableResource().getMemorySize());
         reportNm2.getAvailableResource().getMemorySize());
 
 
     LeafQueue leafQueue = (LeafQueue) cs.getQueue("a");
     LeafQueue leafQueue = (LeafQueue) cs.getQueue("a");
-    assertEquals(0 * GB, leafQueue.getMetrics().getAvailableMB());
-    assertEquals(5 * GB, leafQueue.getMetrics().getAllocatedMB());
+    assertEquals(5 * GB, leafQueue.getMetrics().getAvailableMB());
+    assertEquals(0 * GB, leafQueue.getMetrics().getAllocatedMB());
     rm1.close();
     rm1.close();
   }
   }
 
 
@@ -1943,8 +1943,8 @@ public class TestNodeLabelContainerAllocation {
     double delta = 0.0001;
     double delta = 0.0001;
     // 3GB is used from label x quota. 1.5 GB is remaining from default label.
     // 3GB is used from label x quota. 1.5 GB is remaining from default label.
     // 2GB is remaining from label x.
     // 2GB is remaining from label x.
-    assertEquals(3.5 * GB, leafQueue.getMetrics().getAvailableMB(), delta);
-    assertEquals(4 * GB, leafQueue.getMetrics().getAllocatedMB());
+    assertEquals(6.5 * GB, leafQueue.getMetrics().getAvailableMB(), delta);
+    assertEquals(1 * GB, leafQueue.getMetrics().getAllocatedMB());
 
 
     // app1 asks for 1 default partition container
     // app1 asks for 1 default partition container
     am1.allocate("*", 1 * GB, 5, new ArrayList<ContainerId>());
     am1.allocate("*", 1 * GB, 5, new ArrayList<ContainerId>());
@@ -1961,7 +1961,7 @@ public class TestNodeLabelContainerAllocation {
     // 3GB is used from label x quota. 2GB used from default label.
     // 3GB is used from label x quota. 2GB used from default label.
     // So total 2.5 GB is remaining.
     // So total 2.5 GB is remaining.
     assertEquals(2.5 * GB, leafQueue.getMetrics().getAvailableMB(), delta);
     assertEquals(2.5 * GB, leafQueue.getMetrics().getAvailableMB(), delta);
-    assertEquals(5 * GB, leafQueue.getMetrics().getAllocatedMB());
+    assertEquals(2 * GB, leafQueue.getMetrics().getAllocatedMB());
 
 
     rm1.close();
     rm1.close();
   }
   }