浏览代码

YARN-525. make CS node-locality-delay refreshable. Contributed by Thomas Graves

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23@1465013 13f79535-47bb-0310-9956-ffa450edef68
Jason Darrell Lowe 12 年之前
父节点
当前提交
e4794f75a5

+ 2 - 0
hadoop-yarn-project/CHANGES.txt

@@ -38,6 +38,8 @@ Release 0.23.7 - UNRELEASED
     YARN-200. yarn log does not output all needed information, and is in a
     binary format (Ravi Prakash via jlowe)
 
+    YARN-525. make CS node-locality-delay refreshable (Thomas Graves via jlowe)
+
   OPTIMIZATIONS
 
     YARN-357. App submission should not be synchronized (daryn)

+ 10 - 8
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java

@@ -124,7 +124,7 @@ public class LeafQueue implements CSQueue {
   
   private final ActiveUsersManager activeUsersManager;
   
-  private final int nodeLocalityDelay;
+  private int nodeLocalityDelay;
   
   public LeafQueue(CapacitySchedulerContext cs, 
       String queueName, CSQueue parent, 
@@ -189,9 +189,6 @@ public class LeafQueue implements CSQueue {
 
     Map<QueueACL, AccessControlList> acls = 
       cs.getConfiguration().getAcls(getQueuePath());
-
-    this.nodeLocalityDelay = 
-        cs.getConfiguration().getNodeLocalityDelay();
     
     setupQueueConfigs(
         cs.getClusterResources(),
@@ -200,7 +197,7 @@ public class LeafQueue implements CSQueue {
         userLimit, userLimitFactor, 
         maxApplications, maxApplicationsPerUser,
         maxActiveApplications, maxActiveApplicationsPerUser,
-        state, acls);
+        state, acls, cs.getConfiguration().getNodeLocalityDelay());
 
     if(LOG.isDebugEnabled()) {
       LOG.debug("LeafQueue:" + " name=" + queueName
@@ -219,7 +216,8 @@ public class LeafQueue implements CSQueue {
       int userLimit, float userLimitFactor,
       int maxApplications, int maxApplicationsPerUser,
       int maxActiveApplications, int maxActiveApplicationsPerUser,
-      QueueState state, Map<QueueACL, AccessControlList> acls)
+      QueueState state, Map<QueueACL, AccessControlList> acls, 
+      int nodeLocalityDelay)
   {
     // Sanity check
     CSQueueUtils.checkMaxCapacity(getQueueName(), capacity, maximumCapacity);
@@ -248,6 +246,8 @@ public class LeafQueue implements CSQueue {
     this.queueInfo.setCapacity(this.capacity);
     this.queueInfo.setMaximumCapacity(this.maximumCapacity);
     this.queueInfo.setQueueState(this.state);
+    
+    this.nodeLocalityDelay = nodeLocalityDelay;
 
     StringBuilder aclsString = new StringBuilder();
     for (Map.Entry<QueueACL, AccessControlList> e : acls.entrySet()) {
@@ -310,7 +310,8 @@ public class LeafQueue implements CSQueue {
         "state = " + state +
         " [= configuredState ]" + "\n" +
         "acls = " + aclsString +
-        " [= configuredAcls ]" + "\n");
+        " [= configuredAcls ]" + "\n" + 
+        "nodeLocalityDelay = " +  nodeLocalityDelay + "\n");
   }
   
   @Override
@@ -596,7 +597,8 @@ public class LeafQueue implements CSQueue {
         newlyParsedLeafQueue.getMaxApplicationsPerUser(),
         newlyParsedLeafQueue.getMaximumActiveApplications(), 
         newlyParsedLeafQueue.getMaximumActiveApplicationsPerUser(),
-        newlyParsedLeafQueue.state, newlyParsedLeafQueue.acls);
+        newlyParsedLeafQueue.state, newlyParsedLeafQueue.acls,
+        newlyParsedLeafQueue.getNodeLocalityDelay());
   }
 
   @Override

+ 35 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java

@@ -129,6 +129,7 @@ public class TestLeafQueue {
   private static final String C = "c";
   private static final String C1 = "c1";
   private static final String D = "d";
+  private static final String E = "e";
   private void setupQueueConfiguration(
       CapacitySchedulerConfiguration conf, 
       final String newRoot) {
@@ -140,7 +141,7 @@ public class TestLeafQueue {
     conf.setAcl(CapacitySchedulerConfiguration.ROOT, QueueACL.SUBMIT_APPLICATIONS, " ");
     
     final String Q_newRoot = CapacitySchedulerConfiguration.ROOT + "." + newRoot;
-    conf.setQueues(Q_newRoot, new String[] {A, B, C, D});
+    conf.setQueues(Q_newRoot, new String[] {A, B, C, D, E});
     conf.setCapacity(Q_newRoot, 100);
     conf.setMaximumCapacity(Q_newRoot, 100);
     conf.setAcl(Q_newRoot, QueueACL.SUBMIT_APPLICATIONS, " ");
@@ -166,9 +167,14 @@ public class TestLeafQueue {
     conf.setCapacity(Q_C1, 100);
 
     final String Q_D = Q_newRoot + "." + D;
-    conf.setCapacity(Q_D, 10);
+    conf.setCapacity(Q_D, 9);
     conf.setMaximumCapacity(Q_D, 11);
     conf.setAcl(Q_D, QueueACL.SUBMIT_APPLICATIONS, "user_d");
+
+    final String Q_E = Q_newRoot + "." + E;
+    conf.setCapacity(Q_E, 1);
+    conf.setMaximumCapacity(Q_E, 1);
+    conf.setAcl(Q_E, QueueACL.SUBMIT_APPLICATIONS, "user_e");
     
   }
 
@@ -1582,6 +1588,33 @@ public class TestLeafQueue {
           c1.getQueueUserAclInfo(user), QueueACL.SUBMIT_APPLICATIONS));
 
   }
+
+  @Test (timeout = 30000)
+  public void testNodeLocalityAfterQueueRefresh() throws Exception {
+
+    // Manipulate queue 'e'
+    LeafQueue e = stubLeafQueue((LeafQueue)queues.get(E));
+
+    // before reinitialization
+    assertEquals(0, e.getNodeLocalityDelay());
+
+    csConf.setInt(CapacitySchedulerConfiguration
+        .NODE_LOCALITY_DELAY, 60);
+    Map<String, CSQueue> newQueues = new HashMap<String, CSQueue>();
+    CSQueue newRoot =
+        CapacityScheduler.parseQueue(csContext, csConf, null,
+            CapacitySchedulerConfiguration.ROOT,
+            newQueues, queues,
+            CapacityScheduler.queueComparator,
+            CapacityScheduler.applicationComparator,
+            TestUtils.spyHook);
+    queues = newQueues;
+    root.reinitialize(newRoot, cs.getClusterResources());
+
+    // after reinitialization
+    assertEquals(60, e.getNodeLocalityDelay());
+  }
+
   
   @After
   public void tearDown() throws Exception {