Browse Source

YARN-6775. CapacityScheduler: Improvements to assignContainers, avoid unnecessary canAssignToUser/Queue calls. (Nathan Roberts via wangda)

Change-Id: I5951f0997547de7d2e4a30b4ad87ab0a59b3066a
Wangda Tan 7 years ago
parent
commit
a39617df63

+ 68 - 25
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java

@@ -956,25 +956,56 @@ public class LeafQueue extends AbstractCSQueue {
         return CSAssignment.NULL_ASSIGNMENT;
       }
 
+      Map<String, CachedUserLimit> userLimits = new HashMap<>();
+      boolean needAssignToQueueCheck = true;
       for (Iterator<FiCaSchedulerApp> assignmentIterator =
            orderingPolicy.getAssignmentIterator(); assignmentIterator
                .hasNext(); ) {
         FiCaSchedulerApp application = assignmentIterator.next();
 
         // Check queue max-capacity limit
-        if (!super.canAssignToThisQueue(clusterResource, node.getPartition(),
-            currentResourceLimits, application.getCurrentReservation(),
-            schedulingMode)) {
-          return CSAssignment.NULL_ASSIGNMENT;
+        Resource appReserved = application.getCurrentReservation();
+        if (needAssignToQueueCheck) {
+          if (!super.canAssignToThisQueue(clusterResource, node.getPartition(),
+              currentResourceLimits, appReserved, schedulingMode)) {
+            return CSAssignment.NULL_ASSIGNMENT;
+          }
+          // If there was no reservation and canAssignToThisQueue returned
+          // true, there is no reason to check further.
+          if (!this.reservationsContinueLooking
+              || appReserved.equals(Resources.none()) || !node.getPartition()
+                  .equals(CommonNodeLabelsManager.NO_LABEL)) {
+            needAssignToQueueCheck = false;
+          }
         }
 
+        CachedUserLimit cul = userLimits.get(application.getUser());
+        Resource cachedUserLimit = null;
+        if (cul != null) {
+          cachedUserLimit = cul.userLimit;
+        }
         Resource userLimit =
             computeUserLimitAndSetHeadroom(application, clusterResource,
-                node.getPartition(), schedulingMode);
+                node.getPartition(), schedulingMode, cachedUserLimit);
+        if (cul == null) {
+          cul = new CachedUserLimit(userLimit);
+          userLimits.put(application.getUser(), cul);
+        }
 
         // Check user limit
-        if (!canAssignToUser(clusterResource, application.getUser(), userLimit,
-            application, node.getPartition(), currentResourceLimits)) {
+        boolean userAssignable = true;
+        if (!cul.canAssign && Resources.fitsIn(appReserved, cul.reservation)) {
+          userAssignable = false;
+        } else {
+          userAssignable =
+              canAssignToUser(clusterResource, application.getUser(), userLimit,
+                  appReserved, node.getPartition(), currentResourceLimits);
+          if (!userAssignable && Resources.fitsIn(cul.reservation, appReserved)) {
+            cul.canAssign = false;
+            cul.reservation = appReserved;
+          }
+        }
+        if (!userAssignable) {
           application.updateAMContainerDiagnostics(AMState.ACTIVATED,
               "User capacity has reached its maximum limit.");
           continue;
@@ -1113,19 +1144,21 @@ public class LeafQueue extends AbstractCSQueue {
   @Lock({LeafQueue.class, FiCaSchedulerApp.class})
   Resource computeUserLimitAndSetHeadroom(FiCaSchedulerApp application,
       Resource clusterResource, String nodePartition,
-      SchedulingMode schedulingMode) {
+      SchedulingMode schedulingMode, Resource userLimit) {
     String user = application.getUser();
     User queueUser = getUser(user);
 
     // Compute user limit respect requested labels,
     // TODO, need consider headroom respect labels also
-    Resource userLimit =
+    if (userLimit == null) {
+      userLimit =
         computeUserLimit(application.getUser(), clusterResource, queueUser,
             nodePartition, schedulingMode, true);
-
+    }
     setQueueResourceLimitsInfo(clusterResource);
 
     Resource headroom =
+        metrics.getUserMetrics(user) == null ? Resources.none() :
         getHeadroom(queueUser, cachedResourceLimitsForHeadroom.getLimit(),
             clusterResource, userLimit, nodePartition);
     
@@ -1133,8 +1166,7 @@ public class LeafQueue extends AbstractCSQueue {
       LOG.debug("Headroom calculation for user " + user + ": " + 
           " userLimit=" + userLimit + 
           " queueMaxAvailRes=" + cachedResourceLimitsForHeadroom.getLimit() +
-          " consumed=" + queueUser.getUsed() + 
-          " headroom=" + headroom);
+          " consumed=" + queueUser.getUsed());
     }
     
     CapacityHeadroomProvider headroomProvider = new CapacityHeadroomProvider(
@@ -1289,36 +1321,37 @@ public class LeafQueue extends AbstractCSQueue {
   
   @Private
   protected synchronized boolean canAssignToUser(Resource clusterResource,
-      String userName, Resource limit, FiCaSchedulerApp application,
+      String userName, Resource limit, Resource rsrv,
       String nodePartition, ResourceLimits currentResourceLimits) {
     User user = getUser(userName);
-
+    Resource used = user.getUsed(nodePartition);
     currentResourceLimits.setAmountNeededUnreserve(Resources.none());
 
     // Note: We aren't considering the current request since there is a fixed
     // overhead of the AM, but it's a > check, not a >= check, so...
     if (Resources
         .greaterThan(resourceCalculator, clusterResource,
-            user.getUsed(nodePartition),
+            used,
             limit)) {
       // if enabled, check to see if could we potentially use this node instead
       // of a reserved node if the application has reserved containers
-      if (this.reservationsContinueLooking &&
-          nodePartition.equals(CommonNodeLabelsManager.NO_LABEL)) {
+      if (this.reservationsContinueLooking && !rsrv.equals(Resources.none())
+          && nodePartition.equals(CommonNodeLabelsManager.NO_LABEL)) {
+
         if (Resources.lessThanOrEqual(
             resourceCalculator,
             clusterResource,
-            Resources.subtract(user.getUsed(),
-                application.getCurrentReservation()), limit)) {
+            Resources.subtract(used,
+                rsrv), limit)) {
 
           if (LOG.isDebugEnabled()) {
             LOG.debug("User " + userName + " in queue " + getQueueName()
                 + " will exceed limit based on reservations - " + " consumed: "
-                + user.getUsed() + " reserved: "
-                + application.getCurrentReservation() + " limit: " + limit);
+                + used + " reserved: "
+                + rsrv + " limit: " + limit);
           }
           Resource amountNeededToUnreserve =
-              Resources.subtract(user.getUsed(nodePartition), limit);
+              Resources.subtract(used, limit);
           // we can only acquire a new container if we unreserve first to
           // respect user-limit
           currentResourceLimits.setAmountNeededUnreserve(amountNeededToUnreserve);
@@ -1328,7 +1361,7 @@ public class LeafQueue extends AbstractCSQueue {
       if (LOG.isDebugEnabled()) {
         LOG.debug("User " + userName + " in queue " + getQueueName()
             + " will exceed limit - " + " consumed: "
-            + user.getUsed(nodePartition) + " limit: " + limit);
+            + used + " limit: " + limit);
       }
       return false;
     }
@@ -1623,7 +1656,7 @@ public class LeafQueue extends AbstractCSQueue {
       synchronized (application) {
         computeUserLimitAndSetHeadroom(application, clusterResource,
             RMNodeLabelsManager.NO_LABEL,
-            SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+            SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY, null);
       }
     }
   }
@@ -1733,7 +1766,7 @@ public class LeafQueue extends AbstractCSQueue {
     public ResourceUsage getResourceUsage() {
       return userResourceUsage;
     }
-    
+
     public synchronized float resetAndUpdateUsageRatio(
         ResourceCalculator resourceCalculator,
         Resource resource, String nodePartition) {
@@ -2109,6 +2142,16 @@ public class LeafQueue extends AbstractCSQueue {
     }
   }
 
+  static class CachedUserLimit {
+    final Resource userLimit;
+    boolean canAssign = true;
+    Resource reservation = Resources.none();
+
+    CachedUserLimit(Resource userLimit) {
+      this.userLimit = userLimit;
+    }
+  }
+
   /**
    * Get all valid users in this queue.
    * @return user list

+ 145 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java

@@ -47,6 +47,7 @@ import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenIdentifier;
 import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.util.Time;
 import org.apache.hadoop.yarn.LocalConfigurationProvider;
 import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
@@ -145,8 +146,12 @@ import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
 import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator;
 import org.apache.hadoop.yarn.util.resource.Resources;
+import org.apache.log4j.Level;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
 import org.junit.After;
 import org.junit.Assert;
+import org.junit.Assume;
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.Mockito;
@@ -155,6 +160,8 @@ import com.google.common.base.Supplier;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Sets;
+import java.util.Enumeration;
+import java.util.PriorityQueue;
 
 public class TestCapacityScheduler {
   private static final Log LOG = LogFactory.getLog(TestCapacityScheduler.class);
@@ -3023,6 +3030,7 @@ public class TestCapacityScheduler {
     rm.stop();
   }
 
+
   @Test
   public void testHeadRoomCalculationWithDRC() throws Exception {
     // test with total cluster resource of 20GB memory and 20 vcores.
@@ -3570,6 +3578,143 @@ public class TestCapacityScheduler {
     rm.stop();
   }
 
+  @Test (timeout = 300000)
+  public void testUserLimitThroughput() throws Exception {
+    // Since this is more of a performance unit test, only run if
+    // RunUserLimitThroughput is set (-DRunUserLimitThroughput=true)
+    Assume.assumeTrue(Boolean.valueOf(
+        System.getProperty("RunUserLimitThroughput")));
+
+    CapacitySchedulerConfiguration csconf =
+        new CapacitySchedulerConfiguration();
+    csconf.setMaximumApplicationMasterResourcePerQueuePercent("root", 100.0f);
+    csconf.setMaximumAMResourcePercentPerPartition("root", "", 100.0f);
+    csconf.setMaximumApplicationMasterResourcePerQueuePercent("root.default",
+        100.0f);
+    csconf.setMaximumAMResourcePercentPerPartition("root.default", "", 100.0f);
+    csconf.setResourceComparator(DominantResourceCalculator.class);
+
+    YarnConfiguration conf = new YarnConfiguration(csconf);
+      conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
+          ResourceScheduler.class);
+
+    MockRM rm = new MockRM(conf);
+    rm.start();
+
+    CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
+    LeafQueue qb = (LeafQueue)cs.getQueue("default");
+
+    // For now make user limit large so we can activate all applications
+    qb.setUserLimitFactor((float)100.0);
+    qb.setupConfigurableCapacities();
+
+    SchedulerEvent addAppEvent;
+    SchedulerEvent addAttemptEvent;
+    Container container = mock(Container.class);
+    ApplicationSubmissionContext submissionContext =
+        mock(ApplicationSubmissionContext.class);
+
+    final int appCount = 100;
+    ApplicationId[] appids = new ApplicationId[appCount];
+    RMAppAttemptImpl[] attempts = new RMAppAttemptImpl[appCount];
+    ApplicationAttemptId[] appAttemptIds = new ApplicationAttemptId[appCount];
+    RMAppImpl[] apps = new RMAppImpl[appCount];
+    RMAppAttemptMetrics[] attemptMetrics = new RMAppAttemptMetrics[appCount];
+    for (int i=0; i<appCount; i++) {
+      appids[i] = BuilderUtils.newApplicationId(100, i);
+      appAttemptIds[i] =
+      BuilderUtils.newApplicationAttemptId(appids[i], 1);
+
+      attemptMetrics[i] =
+          new RMAppAttemptMetrics(appAttemptIds[i], rm.getRMContext());
+      apps[i] = mock(RMAppImpl.class);
+      when(apps[i].getApplicationId()).thenReturn(appids[i]);
+      attempts[i] = mock(RMAppAttemptImpl.class);
+      when(attempts[i].getMasterContainer()).thenReturn(container);
+      when(attempts[i].getSubmissionContext()).thenReturn(submissionContext);
+      when(attempts[i].getAppAttemptId()).thenReturn(appAttemptIds[i]);
+      when(attempts[i].getRMAppAttemptMetrics()).thenReturn(attemptMetrics[i]);
+      when(apps[i].getCurrentAppAttempt()).thenReturn(attempts[i]);
+
+      rm.getRMContext().getRMApps().put(appids[i], apps[i]);
+      addAppEvent =
+          new AppAddedSchedulerEvent(appids[i], "default", "user1");
+      cs.handle(addAppEvent);
+      addAttemptEvent =
+          new AppAttemptAddedSchedulerEvent(appAttemptIds[i], false);
+      cs.handle(addAttemptEvent);
+    }
+
+    // add nodes  to cluster, so cluster has 20GB and 20 vcores
+    Resource newResource = Resource.newInstance(10 * GB, 10);
+    RMNode node = MockNodes.newNodeInfo(0, newResource, 1, "127.0.0.1");
+    cs.handle(new NodeAddedSchedulerEvent(node));
+
+    Resource newResource2 = Resource.newInstance(10 * GB, 10);
+    RMNode node2 = MockNodes.newNodeInfo(0, newResource2, 1, "127.0.0.2");
+    cs.handle(new NodeAddedSchedulerEvent(node2));
+
+    Priority u0Priority = TestUtils.createMockPriority(1);
+    RecordFactory recordFactory =
+        RecordFactoryProvider.getRecordFactory(null);
+
+    FiCaSchedulerApp[] fiCaApps = new FiCaSchedulerApp[appCount];
+    for (int i=0;i<appCount;i++) {
+      fiCaApps[i] =
+          cs.getSchedulerApplications().get(apps[i].getApplicationId())
+              .getCurrentAppAttempt();
+      // allocate container for app2 with 1GB memory and 1 vcore
+      fiCaApps[i].updateResourceRequests(Collections.singletonList(
+          TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 1, true,
+              u0Priority, recordFactory)));
+    }
+    // Now force everything to be over user limit
+    qb.setUserLimitFactor((float)0.0);
+
+    // Quiet the loggers while measuring throughput
+    for (Enumeration<?> loggers=LogManager.getCurrentLoggers();
+        loggers.hasMoreElements(); )  {
+      Logger logger = (Logger) loggers.nextElement();
+      logger.setLevel(Level.WARN);
+    }
+    final int topn = 20;
+    final int iterations = 2000000;
+    final int printInterval = 20000;
+    final float numerator = 1000.0f * printInterval;
+    PriorityQueue<Long> queue = new PriorityQueue<>(topn,
+        Collections.reverseOrder());
+
+    long n = Time.monotonicNow();
+    long timespent = 0;
+    for (int i = 0; i < iterations; i+=2) {
+      if (i > 0  && i % printInterval == 0){
+        long ts = (Time.monotonicNow() - n);
+        if (queue.size() < topn) {
+          queue.offer(ts);
+        } else {
+          Long last = queue.peek();
+          if (last > ts) {
+            queue.poll();
+            queue.offer(ts);
+          }
+        }
+        System.out.println(i + " " + (numerator / ts));
+        n= Time.monotonicNow();
+      }
+    cs.handle(new NodeUpdateSchedulerEvent(node));
+    cs.handle(new NodeUpdateSchedulerEvent(node2));
+    }
+    timespent=0;
+    int entries = queue.size();
+    while(queue.size() > 0){
+      long l = queue.poll();
+      timespent += l;
+    }
+    System.out.println("Avg of fastest " + entries + ": "
+        + numerator / (timespent / entries));
+    rm.stop();
+  }
+
   @Test
   public void testCSQueueBlocked() throws Exception {
     CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();

+ 5 - 5
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java

@@ -932,7 +932,7 @@ public class TestLeafQueue {
     qb.assignContainers(clusterResource, node_0,
         new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     qb.computeUserLimitAndSetHeadroom(app_0, clusterResource,
-        "", SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+        "", SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY, null);
 
     //maxqueue 16G, userlimit 13G, - 4G used = 9G
     assertEquals(9*GB,app_0.getHeadroom().getMemorySize());
@@ -951,7 +951,7 @@ public class TestLeafQueue {
     qb.assignContainers(clusterResource, node_1,
         new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     qb.computeUserLimitAndSetHeadroom(app_0, clusterResource,
-        "", SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+        "", SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY, null);
 
     assertEquals(8*GB, qb.getUsedResources().getMemorySize());
     assertEquals(4*GB, app_0.getCurrentConsumption().getMemorySize());
@@ -995,7 +995,7 @@ public class TestLeafQueue {
     qb.assignContainers(clusterResource, node_0,
         new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     qb.computeUserLimitAndSetHeadroom(app_3, clusterResource,
-        "", SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+        "", SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY, null);
     assertEquals(4*GB, qb.getUsedResources().getMemorySize());
     //maxqueue 16G, userlimit 7G, used (by each user) 2G, headroom 5G (both)
     assertEquals(5*GB, app_3.getHeadroom().getMemorySize());
@@ -1013,9 +1013,9 @@ public class TestLeafQueue {
     qb.assignContainers(clusterResource, node_1,
         new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     qb.computeUserLimitAndSetHeadroom(app_4, clusterResource,
-        "", SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+        "", SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY, null);
     qb.computeUserLimitAndSetHeadroom(app_3, clusterResource,
-        "", SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+        "", SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY, null);
     
     
     //app3 is user1, active from last test case

+ 3 - 3
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java

@@ -1207,7 +1207,7 @@ public class TestReservations {
     // not over the limit
     Resource limit = Resources.createResource(14 * GB, 0);
     ResourceLimits userResourceLimits = new ResourceLimits(clusterResource);
-    boolean res = a.canAssignToUser(clusterResource, user_0, limit, app_0, "", userResourceLimits);
+    boolean res = a.canAssignToUser(clusterResource, user_0, limit, app_0.getCurrentReservation(), "", userResourceLimits);
     assertTrue(res);
     assertEquals(Resources.none(), userResourceLimits.getAmountNeededUnreserve());
 
@@ -1215,7 +1215,7 @@ public class TestReservations {
     // set limit so it subtracts reservations and it can continue
     limit = Resources.createResource(12 * GB, 0);
     userResourceLimits = new ResourceLimits(clusterResource);
-    res = a.canAssignToUser(clusterResource, user_0, limit, app_0,
+    res = a.canAssignToUser(clusterResource, user_0, limit, app_0.getCurrentReservation(),
              "", userResourceLimits);
     assertTrue(res);
     // limit set to 12GB, we are using 13GB (8 allocated,  5 reserved), to get under limit
@@ -1228,7 +1228,7 @@ public class TestReservations {
     userResourceLimits = new ResourceLimits(clusterResource);
 
     // should now return false since feature off
-    res = a.canAssignToUser(clusterResource, user_0, limit, app_0, "", userResourceLimits);
+    res = a.canAssignToUser(clusterResource, user_0, limit, app_0.getCurrentReservation(), "", userResourceLimits);
     assertFalse(res);
     assertEquals(Resources.none(), userResourceLimits.getAmountNeededUnreserve());
   }