Browse Source

Merge -c 1241659 from trunk to branch-0.23 to fix MAPREDUCE-3833. Fixed a bug in reinitiaziling of queues. Contributed by Jason Lowe.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23@1241660 13f79535-47bb-0310-9956-ffa450edef68
Arun Murthy 13 years ago
parent
commit
e4d7448b7f

+ 3 - 0
hadoop-mapreduce-project/CHANGES.txt

@@ -705,6 +705,9 @@ Release 0.23.1 - Unreleased
     requesting containers so that scheduler can give off data local containers
     requesting containers so that scheduler can give off data local containers
     correctly. (Siddarth Seth via vinodkv)
     correctly. (Siddarth Seth via vinodkv)
  
  
+    MAPREDUCE-3833. Fixed a bug in reinitiaziling of queues. (Jason Lowe via
+    acmurthy) 
+
 Release 0.23.0 - 2011-11-01 
 Release 0.23.0 - 2011-11-01 
 
 
   INCOMPATIBLE CHANGES
   INCOMPATIBLE CHANGES

+ 6 - 6
hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java

@@ -368,6 +368,12 @@ public class ParentQueue implements CSQueue {
 
 
     ParentQueue parentQueue = (ParentQueue)queue;
     ParentQueue parentQueue = (ParentQueue)queue;
 
 
+    // Set new configs
+    setupQueueConfigs(clusterResource,
+        parentQueue.capacity, parentQueue.absoluteCapacity,
+        parentQueue.maximumCapacity, parentQueue.absoluteMaxCapacity,
+        parentQueue.state, parentQueue.acls);
+
     // Re-configure existing child queues and add new ones
     // Re-configure existing child queues and add new ones
     // The CS has already checked to ensure all existing child queues are present!
     // The CS has already checked to ensure all existing child queues are present!
     Map<String, CSQueue> currentChildQueues = getQueues(childQueues);
     Map<String, CSQueue> currentChildQueues = getQueues(childQueues);
@@ -389,12 +395,6 @@ public class ParentQueue implements CSQueue {
     // Re-sort all queues
     // Re-sort all queues
     childQueues.clear();
     childQueues.clear();
     childQueues.addAll(currentChildQueues.values());
     childQueues.addAll(currentChildQueues.values());
-
-    // Set new configs
-    setupQueueConfigs(clusterResource,
-        parentQueue.capacity, parentQueue.absoluteCapacity,
-        parentQueue.maximumCapacity, parentQueue.absoluteMaxCapacity,
-        parentQueue.state, parentQueue.acls);
   }
   }
 
 
   Map<String, CSQueue> getQueues(Set<CSQueue> queues) {
   Map<String, CSQueue> getQueues(Set<CSQueue> queues) {

+ 99 - 16
hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java

@@ -18,11 +18,12 @@
 
 
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
 
 
+import static org.junit.Assert.assertEquals;
 
 
 import java.io.IOException;
 import java.io.IOException;
+import java.util.List;
 
 
 import junit.framework.Assert;
 import junit.framework.Assert;
-import junit.framework.TestCase;
 
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.commons.logging.LogFactory;
@@ -47,6 +48,21 @@ import org.junit.Test;
 public class TestCapacityScheduler {
 public class TestCapacityScheduler {
   private static final Log LOG = LogFactory.getLog(TestCapacityScheduler.class);
   private static final Log LOG = LogFactory.getLog(TestCapacityScheduler.class);
   
   
+  private static final String A = CapacitySchedulerConfiguration.ROOT + ".a";
+  private static final String B = CapacitySchedulerConfiguration.ROOT + ".b";
+  private static final String A1 = A + ".a1";
+  private static final String A2 = A + ".a2";
+  private static final String B1 = B + ".b1";
+  private static final String B2 = B + ".b2";
+  private static final String B3 = B + ".b3";
+  private static int A_CAPACITY = 10;
+  private static int B_CAPACITY = 90;
+  private static int A1_CAPACITY = 30;
+  private static int A2_CAPACITY = 70;
+  private static int B1_CAPACITY = 50;
+  private static int B2_CAPACITY = 30;
+  private static int B3_CAPACITY = 20;
+
   private ResourceManager resourceManager = null;
   private ResourceManager resourceManager = null;
   
   
   @Before
   @Before
@@ -200,35 +216,102 @@ public class TestCapacityScheduler {
     conf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {"a", "b"});
     conf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {"a", "b"});
     conf.setCapacity(CapacitySchedulerConfiguration.ROOT, 100);
     conf.setCapacity(CapacitySchedulerConfiguration.ROOT, 100);
     
     
-    final String A = CapacitySchedulerConfiguration.ROOT + ".a";
-    conf.setCapacity(A, 10);
-    
-    final String B = CapacitySchedulerConfiguration.ROOT + ".b";
-    conf.setCapacity(B, 90);
+    conf.setCapacity(A, A_CAPACITY);
+    conf.setCapacity(B, B_CAPACITY);
     
     
     // Define 2nd-level queues
     // Define 2nd-level queues
-    final String A1 = A + ".a1";
-    final String A2 = A + ".a2";
     conf.setQueues(A, new String[] {"a1", "a2"});
     conf.setQueues(A, new String[] {"a1", "a2"});
-    conf.setCapacity(A1, 30);
+    conf.setCapacity(A1, A1_CAPACITY);
     conf.setUserLimitFactor(A1, 100.0f);
     conf.setUserLimitFactor(A1, 100.0f);
-    conf.setCapacity(A2, 70);
+    conf.setCapacity(A2, A2_CAPACITY);
     conf.setUserLimitFactor(A2, 100.0f);
     conf.setUserLimitFactor(A2, 100.0f);
     
     
-    final String B1 = B + ".b1";
-    final String B2 = B + ".b2";
-    final String B3 = B + ".b3";
     conf.setQueues(B, new String[] {"b1", "b2", "b3"});
     conf.setQueues(B, new String[] {"b1", "b2", "b3"});
-    conf.setCapacity(B1, 50);
+    conf.setCapacity(B1, B1_CAPACITY);
     conf.setUserLimitFactor(B1, 100.0f);
     conf.setUserLimitFactor(B1, 100.0f);
-    conf.setCapacity(B2, 30);
+    conf.setCapacity(B2, B2_CAPACITY);
     conf.setUserLimitFactor(B2, 100.0f);
     conf.setUserLimitFactor(B2, 100.0f);
-    conf.setCapacity(B3, 20);
+    conf.setCapacity(B3, B3_CAPACITY);
     conf.setUserLimitFactor(B3, 100.0f);
     conf.setUserLimitFactor(B3, 100.0f);
 
 
     LOG.info("Setup top-level queues a and b");
     LOG.info("Setup top-level queues a and b");
   }
   }
   
   
+  @Test
+  public void testRefreshQueues() throws Exception {
+    CapacityScheduler cs = new CapacityScheduler();
+    CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
+    setupQueueConfiguration(conf);
+    cs.reinitialize(conf, null, null);
+    checkQueueCapacities(cs, A_CAPACITY, B_CAPACITY);
+
+    conf.setCapacity(A, 80);
+    conf.setCapacity(B, 20);
+    cs.reinitialize(conf, null,null);
+    checkQueueCapacities(cs, 80, 20);
+  }
+
+  private void checkQueueCapacities(CapacityScheduler cs,
+      int capacityA, int capacityB) {
+    CSQueue rootQueue = cs.getRootQueue();
+    CSQueue queueA = findQueue(rootQueue, A);
+    CSQueue queueB = findQueue(rootQueue, B);
+    CSQueue queueA1 = findQueue(queueA, A1);
+    CSQueue queueA2 = findQueue(queueA, A2);
+    CSQueue queueB1 = findQueue(queueB, B1);
+    CSQueue queueB2 = findQueue(queueB, B2);
+    CSQueue queueB3 = findQueue(queueB, B3);
+
+    float capA = capacityA / 100.0f;
+    float capB = capacityB / 100.0f;
+
+    checkQueueCapacity(queueA, capA, capA, 1.0f, 1.0f);
+    checkQueueCapacity(queueB, capB, capB, 1.0f, 1.0f);
+    checkQueueCapacity(queueA1, A1_CAPACITY / 100.0f,
+        (A1_CAPACITY/100.0f) * capA, 1.0f, 1.0f);
+    checkQueueCapacity(queueA2, (float)A2_CAPACITY / 100.0f,
+        (A2_CAPACITY/100.0f) * capA, 1.0f, 1.0f);
+    checkQueueCapacity(queueB1, (float)B1_CAPACITY / 100.0f,
+        (B1_CAPACITY/100.0f) * capB, 1.0f, 1.0f);
+    checkQueueCapacity(queueB2, (float)B2_CAPACITY / 100.0f,
+        (B2_CAPACITY/100.0f) * capB, 1.0f, 1.0f);
+    checkQueueCapacity(queueB3, (float)B3_CAPACITY / 100.0f,
+        (B3_CAPACITY/100.0f) * capB, 1.0f, 1.0f);
+  }
+
+  private void checkQueueCapacity(CSQueue q, float expectedCapacity,
+      float expectedAbsCapacity, float expectedMaxCapacity,
+      float expectedAbsMaxCapacity) {
+    final float epsilon = 1e-5f;
+    assertEquals("capacity", expectedCapacity, q.getCapacity(), epsilon);
+    assertEquals("absolute capacity", expectedAbsCapacity,
+        q.getAbsoluteCapacity(), epsilon);
+    assertEquals("maximum capacity", expectedMaxCapacity,
+        q.getMaximumCapacity(), epsilon);
+    assertEquals("absolute maximum capacity", expectedAbsMaxCapacity,
+        q.getAbsoluteMaximumCapacity(), epsilon);
+  }
+
+  private CSQueue findQueue(CSQueue root, String queuePath) {
+    if (root.getQueuePath().equals(queuePath)) {
+      return root;
+    }
+
+    List<CSQueue> childQueues = root.getChildQueues();
+    if (childQueues != null) {
+      for (CSQueue q : childQueues) {
+        if (queuePath.startsWith(q.getQueuePath())) {
+          CSQueue result = findQueue(q, queuePath);
+          if (result != null) {
+            return result;
+          }
+        }
+      }
+    }
+
+    return null;
+  }
+
   private void checkApplicationResourceUsage(int expected, 
   private void checkApplicationResourceUsage(int expected, 
       Application application) {
       Application application) {
     Assert.assertEquals(expected, application.getUsedResources().getMemory());
     Assert.assertEquals(expected, application.getUsedResources().getMemory());