浏览代码

YARN-2811. In Fair Scheduler, reservation fulfillments shouldn't ignore max share (Siqi Li via Sandy Ryza)

Sandy Ryza 10 年之前
父节点
当前提交
1a47f890ba

+ 3 - 0
hadoop-yarn-project/CHANGES.txt

@@ -225,6 +225,9 @@ Release 2.6.0 - 2014-11-18
     YARN-2505. Supported get/add/remove/change labels in RM REST API. (Craig Welch
     via zjshen)
 
+    YARN-2811. In Fair Scheduler, reservation fulfillments shouldn't ignore max
+    share (Siqi Li via Sandy Ryza)
+
   IMPROVEMENTS
 
     YARN-2197. Add a link to YARN CHANGES.txt in the left side of doc

+ 16 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java

@@ -1029,7 +1029,10 @@ public class FairScheduler extends
     FSAppAttempt reservedAppSchedulable = node.getReservedAppSchedulable();
     if (reservedAppSchedulable != null) {
       Priority reservedPriority = node.getReservedContainer().getReservedPriority();
-      if (!reservedAppSchedulable.hasContainerForNode(reservedPriority, node)) {
+      FSQueue queue = reservedAppSchedulable.getQueue();
+
+      if (!reservedAppSchedulable.hasContainerForNode(reservedPriority, node)
+          || !fitInMaxShare(queue)) {
         // Don't hold the reservation if app can no longer use it
         LOG.info("Releasing reservation that cannot be satisfied for application "
             + reservedAppSchedulable.getApplicationAttemptId()
@@ -1043,7 +1046,6 @@ public class FairScheduler extends
               + reservedAppSchedulable.getApplicationAttemptId()
               + " on node: " + node);
         }
-        
         node.getReservedAppSchedulable().assignReservedContainer(node);
       }
     }
@@ -1065,6 +1067,18 @@ public class FairScheduler extends
     updateRootQueueMetrics();
   }
 
+  private boolean fitInMaxShare(FSQueue queue) {
+    if (Resources.fitsIn(queue.getResourceUsage(), queue.getMaxShare())) {
+      return false;
+    }
+    
+    FSQueue parentQueue = queue.getParent();
+    if (parentQueue != null) {
+      return fitInMaxShare(parentQueue);
+    }
+    return true;
+  }
+
   public FSAppAttempt getSchedulerApp(ApplicationAttemptId appAttemptId) {
     return super.getApplicationAttempt(appAttemptId);
   }

+ 84 - 4
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java

@@ -722,6 +722,85 @@ public class TestFairScheduler extends FairSchedulerTestBase {
 
   }
 
+  @Test (timeout = 5000)
+  public void testContainerReservationNotExceedingQueueMax() throws Exception {
+    conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
+    PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
+    out.println("<?xml version=\"1.0\"?>");
+    out.println("<allocations>");
+    out.println("<queue name=\"root\">");
+    out.println("<queue name=\"queue1\">");
+    out.println("<minResources>1024mb,5vcores</minResources>");
+    out.println("<maxResources>2048mb,10vcores</maxResources>");
+    out.println("</queue>");
+    out.println("<queue name=\"queue2\">");
+    out.println("<minResources>1024mb,5vcores</minResources>");
+    out.println("<maxResources>2048mb,10vcores</maxResources>");
+    out.println("</queue>");
+    out.println("</queue>");
+    out.println("</allocations>");
+    out.close();
+
+    scheduler.init(conf);
+    scheduler.start();
+    scheduler.reinitialize(conf, resourceManager.getRMContext());
+    
+    // Add a node
+    RMNode node1 =
+        MockNodes
+            .newNodeInfo(1, Resources.createResource(3072, 5), 1, "127.0.0.1");
+    NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
+    scheduler.handle(nodeEvent1);
+
+    // Queue 1 requests full capacity of the queue
+    createSchedulingRequest(2048, "queue1", "user1", 1);
+    scheduler.update();
+    NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node1);
+    scheduler.handle(updateEvent);
+
+    // Make sure queue 1 is allocated app capacity
+    assertEquals(2048, scheduler.getQueueManager().getQueue("queue1").
+        getResourceUsage().getMemory());
+
+    // Now queue 2 requests likewise
+    ApplicationAttemptId attId = createSchedulingRequest(1024, "queue2", "user2", 1);
+    scheduler.update();
+    scheduler.handle(updateEvent);
+
+    // Make sure queue 2 is allocated app capacity
+    assertEquals(1024, scheduler.getQueueManager().getQueue("queue2").
+      getResourceUsage().getMemory());
+    
+    ApplicationAttemptId attId1 = createSchedulingRequest(1024, "queue1", "user1", 1);
+    scheduler.update();
+    scheduler.handle(updateEvent);
+
+    // Make sure queue 1 is waiting with a reservation
+    assertEquals(1024, scheduler.getSchedulerApp(attId1)
+        .getCurrentReservation().getMemory());
+
+    // Now remove app of queue2
+    AppAttemptRemovedSchedulerEvent appRemovedEvent1 = new AppAttemptRemovedSchedulerEvent(
+        attId, RMAppAttemptState.FINISHED, false);
+    scheduler.update();
+    scheduler.handle(appRemovedEvent1);
+
+    // Queue should have no apps
+    assertEquals(0, scheduler.getQueueManager().getQueue("queue2").
+        getResourceUsage().getMemory());
+    
+    createSchedulingRequest(1024, "queue2", "user2", 1);
+    scheduler.handle(updateEvent);
+    // Make sure allocated memory of queue1 doesn't exceed its maximum
+    assertEquals(2048, scheduler.getQueueManager().getQueue("queue1").
+        getResourceUsage().getMemory());
+    //the reservation of queue1 should be reclaim
+    assertEquals(0, scheduler.getSchedulerApp(attId1).
+        getCurrentReservation().getMemory());
+    assertEquals(1024, scheduler.getQueueManager().getQueue("queue2").
+        getResourceUsage().getMemory());
+  }
+
   @Test
   public void testUserAsDefaultQueue() throws Exception {
     conf.set(FairSchedulerConfiguration.USER_AS_DEFAULT_QUEUE, "true");
@@ -2076,9 +2155,10 @@ public class TestFairScheduler extends FairSchedulerTestBase {
     scheduler.handle(updateEvent);
     
     assertEquals(1, app.getLiveContainers().size());
-    // Reserved container should still be at lower priority
+    // Reserved container should will be at higher priority,
+    // since old reservation cannot be satisfied
     for (RMContainer container : app.getReservedContainers()) {
-      assertEquals(2, container.getReservedPriority().getPriority());
+      assertEquals(1, container.getReservedPriority().getPriority());
     }
     
     // Complete container
@@ -2091,11 +2171,11 @@ public class TestFairScheduler extends FairSchedulerTestBase {
     scheduler.update();
     scheduler.handle(updateEvent);
     
-    // Reserved container (at lower priority) should be run
+    // Reserved container (at higher priority) should be run
     Collection<RMContainer> liveContainers = app.getLiveContainers();
     assertEquals(1, liveContainers.size());
     for (RMContainer liveContainer : liveContainers) {
-      Assert.assertEquals(2, liveContainer.getContainer().getPriority().getPriority());
+      Assert.assertEquals(1, liveContainer.getContainer().getPriority().getPriority());
     }
     assertEquals(0, scheduler.getRootQueueMetrics().getAvailableMB());
     assertEquals(0, scheduler.getRootQueueMetrics().getAvailableVirtualCores());