Browse Source

YARN-2628. Capacity scheduler with DominantResourceCalculator carries out reservation even though slots are free. Contributed by Varun Vasudev
(cherry picked from commit 054f28552687e9b9859c0126e16a2066e20ead3f)

Jian He 10 năm trước cách đây
mục cha
commit
65cf007ab9

+ 3 - 0
hadoop-yarn-project/CHANGES.txt

@@ -470,6 +470,9 @@ Release 2.6.0 - UNRELEASED
     YARN-2527. Fixed the potential NPE in ApplicationACLsManager and added test
     cases for it. (Benoy Antony via zjshen)
 
+    YARN-2628. Capacity scheduler with DominantResourceCalculator carries out
+    reservation even though slots are free. (Varun Vasudev via jianhe)
+
 Release 2.5.1 - 2014-09-05
 
   INCOMPATIBLE CHANGES

+ 2 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java

@@ -941,8 +941,8 @@ public class CapacityScheduler extends
 
     // Try to schedule more if there are no reservations to fulfill
     if (node.getReservedContainer() == null) {
-      if (Resources.greaterThanOrEqual(calculator, getClusterResource(),
-          node.getAvailableResource(), minimumAllocation)) {
+      if (calculator.computeAvailableContainers(node.getAvailableResource(),
+        minimumAllocation) > 0) {
         if (LOG.isDebugEnabled()) {
           LOG.debug("Trying to schedule on node: " + node.getNodeName() +
               ", available: " + node.getAvailableResource());

+ 64 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java

@@ -64,6 +64,7 @@ import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceOption;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.AsyncDispatcher;
 import org.apache.hadoop.yarn.exceptions.YarnException;
@@ -115,6 +116,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.CapacitySchedule
 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.CapacitySchedulerQueueInfo;
 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.CapacitySchedulerQueueInfoList;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
+import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator;
 import org.apache.hadoop.yarn.util.resource.Resources;
 import org.junit.After;
 import org.junit.Assert;
@@ -1995,4 +1997,66 @@ public class TestCapacityScheduler {
     rm.stop();
   }
 
+  // Test to ensure that we don't carry out reservation on nodes
+  // that have no CPU available when using the DominantResourceCalculator
+  @Test(timeout = 30000)
+  public void testAppReservationWithDominantResourceCalculator() throws Exception {
+    CapacitySchedulerConfiguration csconf =
+        new CapacitySchedulerConfiguration();
+    csconf.setResourceComparator(DominantResourceCalculator.class);
+
+    YarnConfiguration conf = new YarnConfiguration(csconf);
+    conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
+      ResourceScheduler.class);
+
+    MockRM rm = new MockRM(conf);
+    rm.start();
+
+    MockNM nm1 = rm.registerNode("127.0.0.1:1234", 10 * GB, 1);
+
+    // register extra nodes to bump up cluster resource
+    MockNM nm2 = rm.registerNode("127.0.0.1:1235", 10 * GB, 4);
+    rm.registerNode("127.0.0.1:1236", 10 * GB, 4);
+
+    RMApp app1 = rm.submitApp(1024);
+    // kick the scheduling
+    nm1.nodeHeartbeat(true);
+    RMAppAttempt attempt1 = app1.getCurrentAppAttempt();
+    MockAM am1 = rm.sendAMLaunched(attempt1.getAppAttemptId());
+    am1.registerAppAttempt();
+    SchedulerNodeReport report_nm1 =
+        rm.getResourceScheduler().getNodeReport(nm1.getNodeId());
+
+    // check node report
+    Assert.assertEquals(1 * GB, report_nm1.getUsedResource().getMemory());
+    Assert.assertEquals(9 * GB, report_nm1.getAvailableResource().getMemory());
+
+    // add request for containers
+    am1.addRequests(new String[] { "127.0.0.1", "127.0.0.2" }, 1 * GB, 1, 1);
+    am1.schedule(); // send the request
+
+    // kick the scheduler, container reservation should not happen
+    nm1.nodeHeartbeat(true);
+    Thread.sleep(1000);
+    AllocateResponse allocResponse = am1.schedule();
+    ApplicationResourceUsageReport report =
+        rm.getResourceScheduler().getAppResourceUsageReport(
+          attempt1.getAppAttemptId());
+    Assert.assertEquals(0, allocResponse.getAllocatedContainers().size());
+    Assert.assertEquals(0, report.getNumReservedContainers());
+
+    // container should get allocated on this node
+    nm2.nodeHeartbeat(true);
+
+    while (allocResponse.getAllocatedContainers().size() == 0) {
+      Thread.sleep(100);
+      allocResponse = am1.schedule();
+    }
+    report =
+        rm.getResourceScheduler().getAppResourceUsageReport(
+          attempt1.getAppAttemptId());
+    Assert.assertEquals(1, allocResponse.getAllocatedContainers().size());
+    Assert.assertEquals(0, report.getNumReservedContainers());
+    rm.stop();
+  }
 }