瀏覽代碼

YARN-7619. Max AM Resource value in Capacity Scheduler UI has to be refreshed for every user. Contributed by Eric Payne.

(cherry picked from commit a79abbc03e75a5d2982de12587bd3b45169cc884)
Sunil G 7 年之前
父節點
當前提交
c4e0821d72

+ 17 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceUsage.java

@@ -62,7 +62,7 @@ public class ResourceUsage {
     //CACHED_USED and CACHED_PENDING may be read by anyone, but must only
     //be written by ordering policies
     USED(0), PENDING(1), AMUSED(2), RESERVED(3), CACHED_USED(4),
-      CACHED_PENDING(5), AMLIMIT(6);
+      CACHED_PENDING(5), AMLIMIT(6), USERAMLIMIT(7);
 
     private int idx;
 
@@ -434,4 +434,20 @@ public class ResourceUsage {
       readLock.unlock();
     }
   }
+
+  public Resource getUserAMLimit() {
+    return getAMLimit(NL);
+  }
+
+  public Resource getUserAMLimit(String label) {
+    return _get(label, ResourceType.USERAMLIMIT);
+  }
+
+  public void setUserAMLimit(Resource res) {
+    setAMLimit(NL, res);
+  }
+
+  public void setUserAMLimit(String label, Resource res) {
+    _set(label, ResourceType.USERAMLIMIT, res);
+  }
 }

+ 23 - 4
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java

@@ -682,6 +682,7 @@ public class LeafQueue extends AbstractCSQueue {
        */
       float effectiveUserLimit = Math.max(usersManager.getUserLimit() / 100.0f,
           1.0f / Math.max(getAbstractUsersManager().getNumActiveUsers(), 1));
+      float preWeightedUserLimit = effectiveUserLimit;
       effectiveUserLimit = Math.min(effectiveUserLimit * userWeight, 1.0f);
 
       Resource queuePartitionResource = Resources
@@ -696,10 +697,28 @@ public class LeafQueue extends AbstractCSQueue {
           queueCapacities.getMaxAMResourcePercentage(nodePartition)
               * effectiveUserLimit * usersManager.getUserLimitFactor(),
           minimumAllocation);
-      return Resources.lessThanOrEqual(resourceCalculator, lastClusterResource,
-          userAMLimit, getAMResourceLimitPerPartition(nodePartition)) ?
-          userAMLimit :
-          getAMResourceLimitPerPartition(nodePartition);
+      userAMLimit =
+          Resources.min(resourceCalculator, lastClusterResource,
+              userAMLimit,
+              Resources.clone(getAMResourceLimitPerPartition(nodePartition)));
+
+      Resource preWeighteduserAMLimit = Resources.multiplyAndNormalizeUp(
+          resourceCalculator, queuePartitionResource,
+          queueCapacities.getMaxAMResourcePercentage(nodePartition)
+              * preWeightedUserLimit * usersManager.getUserLimitFactor(),
+          minimumAllocation);
+      preWeighteduserAMLimit =
+          Resources.min(resourceCalculator, lastClusterResource,
+              preWeighteduserAMLimit,
+              Resources.clone(getAMResourceLimitPerPartition(nodePartition)));
+      queueUsage.setUserAMLimit(nodePartition, preWeighteduserAMLimit);
+
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Effective user AM limit for \"" + userName + "\":" +
+            preWeighteduserAMLimit + ". " + "Effective weighted user AM limit: "
+            + userAMLimit + ". User weight: " + userWeight);
+      }
+      return userAMLimit;
     } finally {
       readLock.unlock();
     }

+ 23 - 9
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/CapacitySchedulerPage.java

@@ -143,13 +143,13 @@ class CapacitySchedulerPage extends RmView {
       // Get UserInfo from first user to calculate AM Resource Limit per user.
       ResourceInfo userAMResourceLimit = null;
       ArrayList<UserInfo> usersList = lqinfo.getUsers().getUsersList();
-      if (usersList.isEmpty()) {
-        // If no users are present, consider AM Limit for that queue.
+      if (!usersList.isEmpty()) {
+        userAMResourceLimit = resourceUsages.getUserAmLimit();
+      }
+      // If no users are present or if AM limit per user doesn't exist, retrieve
+      // AM Limit for that queue.
+      if (userAMResourceLimit == null) {
         userAMResourceLimit = resourceUsages.getAMLimit();
-      } else {
-        userAMResourceLimit = usersList.get(0)
-            .getResourceUsageInfo().getPartitionResourceUsageInfo(label)
-            .getAMLimit();
       }
       ResourceInfo amUsed = (resourceUsages.getAmUsed() == null)
           ? new ResourceInfo(Resources.none())
@@ -218,11 +218,25 @@ class CapacitySchedulerPage extends RmView {
               .$class("ui-state-default")._("Non-Schedulable Apps")._()._()._()
               .tbody();
 
+      PartitionResourcesInfo queueUsageResources =
+          lqinfo.getResources().getPartitionResourceUsageInfo(
+              nodeLabel == null ? "" : nodeLabel);
+
       ArrayList<UserInfo> users = lqinfo.getUsers().getUsersList();
       for (UserInfo userInfo : users) {
         ResourceInfo resourcesUsed = userInfo.getResourcesUsed();
-        PartitionResourcesInfo resourceUsages = userInfo.getResourceUsageInfo()
-            .getPartitionResourceUsageInfo((nodeLabel == null) ? "" : nodeLabel);
+        ResourceInfo userAMLimitPerPartition =
+            queueUsageResources.getUserAmLimit();
+        // If AM limit per user is null, use the AM limit for the queue level.
+        if (userAMLimitPerPartition == null) {
+          userAMLimitPerPartition = queueUsageResources.getAMLimit();
+        }
+        if (userInfo.getUserWeight() != 1.0) {
+          userAMLimitPerPartition =
+              new ResourceInfo(
+                  Resources.multiply(userAMLimitPerPartition.getResource(),
+                      userInfo.getUserWeight()));
+        }
         if (nodeLabel != null) {
           resourcesUsed = userInfo.getResourceUsageInfo()
               .getPartitionResourceUsageInfo(nodeLabel).getUsed();
@@ -237,7 +251,7 @@ class CapacitySchedulerPage extends RmView {
             .td(userInfo.getUserResourceLimit().toString())
             .td(String.valueOf(userInfo.getUserWeight()))
             .td(resourcesUsed.toString())
-            .td(resourceUsages.getAMLimit().toString())
+            .td(userAMLimitPerPartition.toString())
             .td(amUsed.toString())
             .td(Integer.toString(userInfo.getNumActiveApplications()))
             .td(Integer.toString(userInfo.getNumPendingApplications()))._();

+ 18 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/PartitionResourcesInfo.java

@@ -33,13 +33,15 @@ public class PartitionResourcesInfo {
   private ResourceInfo pending;
   private ResourceInfo amUsed;
   private ResourceInfo amLimit = new ResourceInfo();
+  private ResourceInfo userAmLimit;
 
   public PartitionResourcesInfo() {
   }
 
   public PartitionResourcesInfo(String partitionName, ResourceInfo used,
       ResourceInfo reserved, ResourceInfo pending,
-      ResourceInfo amResourceUsed, ResourceInfo amResourceLimit) {
+      ResourceInfo amResourceUsed, ResourceInfo amResourceLimit,
+      ResourceInfo perUserAmResourceLimit) {
     super();
     this.partitionName = partitionName;
     this.used = used;
@@ -47,6 +49,7 @@ public class PartitionResourcesInfo {
     this.pending = pending;
     this.amUsed = amResourceUsed;
     this.amLimit = amResourceLimit;
+    this.userAmLimit = perUserAmResourceLimit;
   }
 
   public String getPartitionName() {
@@ -96,4 +99,18 @@ public class PartitionResourcesInfo {
   public void setAMLimit(ResourceInfo amLimit) {
     this.amLimit = amLimit;
   }
+
+  /**
+   * @return the userAmLimit
+   */
+  public ResourceInfo getUserAmLimit() {
+    return userAmLimit;
+  }
+
+  /**
+   * @param userAmLimit the userAmLimit to set
+   */
+  public void setUserAmLimit(ResourceInfo userAmLimit) {
+    this.userAmLimit = userAmLimit;
+  }
 }

+ 3 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/ResourcesInfo.java

@@ -51,7 +51,9 @@ public class ResourcesInfo {
           considerAMUsage ? new ResourceInfo(resourceUsage
               .getAMUsed(partitionName)) : null,
           considerAMUsage ? new ResourceInfo(resourceUsage
-              .getAMLimit(partitionName)) : null));
+              .getAMLimit(partitionName)) : null,
+          considerAMUsage ? new ResourceInfo(resourceUsage
+              .getUserAMLimit(partitionName)) : null));
     }
   }
 

+ 2 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesForCSWithPartitions.java

@@ -501,13 +501,13 @@ public class TestRMWebServicesForCSWithPartitions extends JerseyTestBase {
         partitionInfo = partitionsCapsArray.getJSONObject(0);
         partitionName = partitionInfo.getString("partitionName");
         verifyPartitionCapacityInfoJson(partitionInfo, 30, 0, 50, 30, 0, 50);
-        assertEquals("incorrect number of elements", 6,
+        assertEquals("incorrect number of elements", 7,
             partitionsResourcesArray.getJSONObject(0).length());
         break;
       case QUEUE_B:
         assertEquals("Invalid default Label expression", LABEL_LX,
             queueJson.getString("defaultNodeLabelExpression"));
-        assertEquals("incorrect number of elements", 6,
+        assertEquals("incorrect number of elements", 7,
             partitionsResourcesArray.getJSONObject(0).length());
         verifyAccesibleNodeLabels(queueJson, ImmutableSet.of(LABEL_LX));
         assertEquals("incorrect number of partitions", 2,