Browse Source

YARN-474. Fix CapacityScheduler to trigger application-activation when am-resource-percent configuration is refreshed. Contributed by Zhijie Shen.
svn merge --ignore-ancestry -c 1461402 ../../trunk/


git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1461403 13f79535-47bb-0310-9956-ffa450edef68

Vinod Kumar Vavilapalli 12 years ago
parent
commit
8a21e3925c

+ 3 - 0
hadoop-yarn-project/CHANGES.txt

@@ -88,6 +88,9 @@ Release 2.0.5-beta - UNRELEASED
     YARN-498. Unmanaged AM launcher does not set various constants in env for
     YARN-498. Unmanaged AM launcher does not set various constants in env for
     an AM, also does not handle failed AMs properly. (Hitesh Shah via bikas)
     an AM, also does not handle failed AMs properly. (Hitesh Shah via bikas)
 
 
+    YARN-474. Fix CapacityScheduler to trigger application-activation when
+    am-resource-percent configuration is refreshed. (Zhijie Shen via vinodkv)
+
 Release 2.0.4-alpha - UNRELEASED
 Release 2.0.4-alpha - UNRELEASED
 
 
   INCOMPATIBLE CHANGES
   INCOMPATIBLE CHANGES

+ 4 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java

@@ -607,6 +607,10 @@ public class LeafQueue implements CSQueue {
         newlyParsedLeafQueue.getMaximumActiveApplications(), 
         newlyParsedLeafQueue.getMaximumActiveApplications(), 
         newlyParsedLeafQueue.getMaximumActiveApplicationsPerUser(),
         newlyParsedLeafQueue.getMaximumActiveApplicationsPerUser(),
         newlyParsedLeafQueue.state, newlyParsedLeafQueue.acls);
         newlyParsedLeafQueue.state, newlyParsedLeafQueue.acls);
+
+    // queue metrics are updated, more resource may be available
+    // activate the pending applications if possible
+    activateApplications();
   }
   }
 
 
   @Override
   @Override

+ 60 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java

@@ -138,6 +138,7 @@ public class TestLeafQueue {
   private static final String C = "c";
   private static final String C = "c";
   private static final String C1 = "c1";
   private static final String C1 = "c1";
   private static final String D = "d";
   private static final String D = "d";
+  private static final String E = "e";
   private void setupQueueConfiguration(
   private void setupQueueConfiguration(
       CapacitySchedulerConfiguration conf, 
       CapacitySchedulerConfiguration conf, 
       final String newRoot) {
       final String newRoot) {
@@ -148,7 +149,7 @@ public class TestLeafQueue {
     conf.setAcl(CapacitySchedulerConfiguration.ROOT, QueueACL.SUBMIT_APPLICATIONS, " ");
     conf.setAcl(CapacitySchedulerConfiguration.ROOT, QueueACL.SUBMIT_APPLICATIONS, " ");
     
     
     final String Q_newRoot = CapacitySchedulerConfiguration.ROOT + "." + newRoot;
     final String Q_newRoot = CapacitySchedulerConfiguration.ROOT + "." + newRoot;
-    conf.setQueues(Q_newRoot, new String[] {A, B, C, D});
+    conf.setQueues(Q_newRoot, new String[] {A, B, C, D, E});
     conf.setCapacity(Q_newRoot, 100);
     conf.setCapacity(Q_newRoot, 100);
     conf.setMaximumCapacity(Q_newRoot, 100);
     conf.setMaximumCapacity(Q_newRoot, 100);
     conf.setAcl(Q_newRoot, QueueACL.SUBMIT_APPLICATIONS, " ");
     conf.setAcl(Q_newRoot, QueueACL.SUBMIT_APPLICATIONS, " ");
@@ -174,10 +175,14 @@ public class TestLeafQueue {
     conf.setCapacity(Q_C1, 100);
     conf.setCapacity(Q_C1, 100);
 
 
     final String Q_D = Q_newRoot + "." + D;
     final String Q_D = Q_newRoot + "." + D;
-    conf.setCapacity(Q_D, 10);
+    conf.setCapacity(Q_D, 9);
     conf.setMaximumCapacity(Q_D, 11);
     conf.setMaximumCapacity(Q_D, 11);
     conf.setAcl(Q_D, QueueACL.SUBMIT_APPLICATIONS, "user_d");
     conf.setAcl(Q_D, QueueACL.SUBMIT_APPLICATIONS, "user_d");
     
     
+    final String Q_E = Q_newRoot + "." + E;
+    conf.setCapacity(Q_E, 1);
+    conf.setMaximumCapacity(Q_E, 1);
+    conf.setAcl(Q_E, QueueACL.SUBMIT_APPLICATIONS, "user_e");
   }
   }
 
 
   static LeafQueue stubLeafQueue(LeafQueue queue) {
   static LeafQueue stubLeafQueue(LeafQueue queue) {
@@ -1567,6 +1572,59 @@ public class TestLeafQueue {
 
 
   }
   }
 
 
+  @Test (timeout = 30000)
+  public void testActivateApplicationAfterQueueRefresh() throws Exception {
+
+    // Manipulate queue 'e'
+    LeafQueue e = stubLeafQueue((LeafQueue)queues.get(E));
+
+    // Users
+    final String user_e = "user_e";
+
+    // Submit applications
+    final ApplicationAttemptId appAttemptId_0 =
+        TestUtils.getMockApplicationAttemptId(0, 0);
+    FiCaSchedulerApp app_0 =
+        new FiCaSchedulerApp(appAttemptId_0, user_e, e,
+            mock(ActiveUsersManager.class), rmContext);
+    e.submitApplication(app_0, user_e, E);
+
+    final ApplicationAttemptId appAttemptId_1 =
+        TestUtils.getMockApplicationAttemptId(1, 0);
+    FiCaSchedulerApp app_1 =
+        new FiCaSchedulerApp(appAttemptId_1, user_e, e,
+            mock(ActiveUsersManager.class), rmContext);
+    e.submitApplication(app_1, user_e, E);  // same user
+
+    final ApplicationAttemptId appAttemptId_2 =
+        TestUtils.getMockApplicationAttemptId(2, 0);
+    FiCaSchedulerApp app_2 =
+        new FiCaSchedulerApp(appAttemptId_2, user_e, e,
+            mock(ActiveUsersManager.class), rmContext);
+    e.submitApplication(app_2, user_e, E);  // same user
+
+    // before reinitialization
+    assertEquals(2, e.activeApplications.size());
+    assertEquals(1, e.pendingApplications.size());
+
+    csConf.setDouble(CapacitySchedulerConfiguration
+        .MAXIMUM_APPLICATION_MASTERS_RESOURCE_PERCENT,
+        CapacitySchedulerConfiguration
+        .DEFAULT_MAXIMUM_APPLICATIONMASTERS_RESOURCE_PERCENT * 2);
+    Map<String, CSQueue> newQueues = new HashMap<String, CSQueue>();
+    CSQueue newRoot =
+        CapacityScheduler.parseQueue(csContext, csConf, null,
+            CapacitySchedulerConfiguration.ROOT,
+            newQueues, queues,
+            TestUtils.spyHook);
+    queues = newQueues;
+    root.reinitialize(newRoot, cs.getClusterResources());
+
+    // after reinitialization
+    assertEquals(3, e.activeApplications.size());
+    assertEquals(0, e.pendingApplications.size());
+  }
+
   public boolean hasQueueACL(List<QueueUserACLInfo> aclInfos, QueueACL acl) {
   public boolean hasQueueACL(List<QueueUserACLInfo> aclInfos, QueueACL acl) {
     for (QueueUserACLInfo aclInfo : aclInfos) {
     for (QueueUserACLInfo aclInfo : aclInfos) {
       if (aclInfo.getUserAcls().contains(acl)) {
       if (aclInfo.getUserAcls().contains(acl)) {