Browse Source

YARN-7527. Over-allocate node resource in async-scheduling mode of CapacityScheduler. (Tao Yang via wangda)

Change-Id: I51ae6c2ab7a3d1febdd7d8d0519b63a13295ac7d
(cherry picked from commit 0d781dd03b979d65de94978071b2faa55005b34a)
Wangda Tan 7 years ago
parent
commit
adc5157a3c

+ 3 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java

@@ -417,7 +417,9 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
 
 
           // Common part of check container allocation regardless if it is a
           // Common part of check container allocation regardless if it is a
           // increase container or regular container
           // increase container or regular container
-          commonCheckContainerAllocation(allocation, schedulerContainer);
+          if (!commonCheckContainerAllocation(allocation, schedulerContainer)) {
+            return false;
+          }
         } else {
         } else {
           // Container reserved first time will be NEW, after the container
           // Container reserved first time will be NEW, after the container
           // accepted & confirmed, it will become RESERVED state
           // accepted & confirmed, it will become RESERVED state

+ 71 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAsyncScheduling.java

@@ -405,6 +405,77 @@ public class TestCapacitySchedulerAsyncScheduling {
     rm.stop();
     rm.stop();
   }
   }
 
 
+  @Test (timeout = 30000)
+  public void testNodeResourceOverAllocated()
+      throws Exception {
+    // disable async-scheduling for simulating complex scene
+    Configuration disableAsyncConf = new Configuration(conf);
+    disableAsyncConf.setBoolean(
+        CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_ENABLE, false);
+
+    // init RM & NMs & Nodes
+    final MockRM rm = new MockRM(disableAsyncConf);
+    rm.start();
+    final MockNM nm1 = rm.registerNode("h1:1234", 9 * GB);
+    final MockNM nm2 = rm.registerNode("h2:1234", 9 * GB);
+    List<MockNM> nmLst = new ArrayList<>();
+    nmLst.add(nm1);
+    nmLst.add(nm2);
+
+    // init scheduler & nodes
+    while (
+        ((CapacityScheduler) rm.getRMContext().getScheduler()).getNodeTracker()
+            .nodeCount() < 2) {
+      Thread.sleep(10);
+    }
+    Assert.assertEquals(2,
+        ((AbstractYarnScheduler) rm.getRMContext().getScheduler())
+            .getNodeTracker().nodeCount());
+    CapacityScheduler scheduler =
+        (CapacityScheduler) rm.getRMContext().getScheduler();
+    SchedulerNode sn1 = scheduler.getSchedulerNode(nm1.getNodeId());
+
+    // launch app
+    RMApp app = rm.submitApp(200, "app", "user", null, false, "default",
+        YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS, null, null, true, true);
+    MockAM am = MockRM.launchAndRegisterAM(app, rm, nm1);
+    FiCaSchedulerApp schedulerApp =
+        scheduler.getApplicationAttempt(am.getApplicationAttemptId());
+    // allocate 2 containers and running on nm1
+    Resource containerResource = Resources.createResource(5 * GB);
+    am.allocate(Arrays.asList(ResourceRequest
+            .newInstance(Priority.newInstance(0), "*", containerResource, 2)),
+        null);
+
+    // generate over-allocated proposals for nm1
+    for (int containerNo = 2; containerNo <= 3; containerNo++) {
+      Container container = Container.newInstance(
+          ContainerId.newContainerId(am.getApplicationAttemptId(), containerNo),
+          sn1.getNodeID(), sn1.getHttpAddress(), containerResource,
+          Priority.newInstance(0), null);
+      RMContainer rmContainer = new RMContainerImpl(container,
+          SchedulerRequestKey.create(ResourceRequest
+              .newInstance(Priority.newInstance(0), "*", containerResource, 1)),
+          am.getApplicationAttemptId(), sn1.getNodeID(), "user",
+          rm.getRMContext());
+      SchedulerContainer newContainer = new SchedulerContainer(schedulerApp,
+          scheduler.getNode(sn1.getNodeID()), rmContainer, "", true);
+      ContainerAllocationProposal newContainerProposal =
+          new ContainerAllocationProposal(newContainer, null, null,
+              NodeType.OFF_SWITCH, NodeType.OFF_SWITCH,
+              SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY, containerResource);
+      List<ContainerAllocationProposal> newProposals = new ArrayList<>();
+      newProposals.add(newContainerProposal);
+      ResourceCommitRequest request =
+          new ResourceCommitRequest(newProposals, null, null);
+      scheduler.tryCommit(scheduler.getClusterResource(), request);
+    }
+    // make sure node resource can't be over-allocated!
+    Assert.assertTrue("Node resource is Over-allocated!",
+        sn1.getUnallocatedResource().getMemorySize() > 0);
+    rm.stop();
+  }
+
   private void allocateAndLaunchContainers(MockAM am, MockNM nm, MockRM rm,
   private void allocateAndLaunchContainers(MockAM am, MockNM nm, MockRM rm,
       int nContainer, Resource resource, int priority, int startContainerId)
       int nContainer, Resource resource, int priority, int startContainerId)
       throws Exception {
       throws Exception {