瀏覽代碼

YARN-2824. Fixed Capacity Scheduler to not crash when some node-labels are not mapped to queues by making default capacities per label to be zero. Contributed by Wangda Tan.

(cherry picked from commit 2ac1be7dec4aef001e3162e364249933b2c4a6c4)
Vinod Kumar Vavilapalli 10 年之前
父節點
當前提交
71a18a5303

+ 4 - 0
hadoop-yarn-project/CHANGES.txt

@@ -822,6 +822,10 @@ Release 2.6.0 - UNRELEASED
 
 
     YARN-2810. TestRMProxyUsersConf fails on Windows VMs. (Varun Vasudev via xgong)
     YARN-2810. TestRMProxyUsersConf fails on Windows VMs. (Varun Vasudev via xgong)
 
 
+    YARN-2824. Fixed Capacity Scheduler to not crash when some node-labels are
+    not mapped to queues by making default capacities per label to be zero.
+    (Wangda Tan via vinodkv)
+
 Release 2.5.2 - UNRELEASED
 Release 2.5.2 - UNRELEASED
 
 
   INCOMPATIBLE CHANGES
   INCOMPATIBLE CHANGES

+ 1 - 0
hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml

@@ -202,6 +202,7 @@
       <Field name="accessibleLabels" />
       <Field name="accessibleLabels" />
       <Field name="absoluteNodeLabelCapacities" />
       <Field name="absoluteNodeLabelCapacities" />
       <Field name="reservationsContinueLooking" />
       <Field name="reservationsContinueLooking" />
+      <Field name="absoluteCapacityByNodeLabels" />
     </Or>
     </Or>
     <Bug pattern="IS2_INCONSISTENT_SYNC" />
     <Bug pattern="IS2_INCONSISTENT_SYNC" />
   </Match>
   </Match>

+ 2 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java

@@ -86,6 +86,8 @@ public class TestDistributedShell {
 
 
     // Setup queue access to node labels
     // Setup queue access to node labels
     conf.set("yarn.scheduler.capacity.root.accessible-node-labels", "x");
     conf.set("yarn.scheduler.capacity.root.accessible-node-labels", "x");
+    conf.set("yarn.scheduler.capacity.root.accessible-node-labels.x.capacity",
+        "100");
     conf.set("yarn.scheduler.capacity.root.default.accessible-node-labels", "x");
     conf.set("yarn.scheduler.capacity.root.default.accessible-node-labels", "x");
     conf.set(
     conf.set(
         "yarn.scheduler.capacity.root.default.accessible-node-labels.x.capacity",
         "yarn.scheduler.capacity.root.default.accessible-node-labels.x.capacity",

+ 15 - 13
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java

@@ -395,16 +395,15 @@ public abstract class AbstractCSQueue implements CSQueue {
   
   
   @Private
   @Private
   public float getCapacityByNodeLabel(String label) {
   public float getCapacityByNodeLabel(String label) {
-    if (null == parent) {
-      return 1f;
-    }
-    
     if (StringUtils.equals(label, RMNodeLabelsManager.NO_LABEL)) {
     if (StringUtils.equals(label, RMNodeLabelsManager.NO_LABEL)) {
+      if (null == parent) {
+        return 1f;
+      }
       return getCapacity();
       return getCapacity();
     }
     }
     
     
     if (!capacitiyByNodeLabels.containsKey(label)) {
     if (!capacitiyByNodeLabels.containsKey(label)) {
-      return 0;
+      return 0f;
     } else {
     } else {
       return capacitiyByNodeLabels.get(label);
       return capacitiyByNodeLabels.get(label);
     }
     }
@@ -412,18 +411,17 @@ public abstract class AbstractCSQueue implements CSQueue {
   
   
   @Private
   @Private
   public float getAbsoluteCapacityByNodeLabel(String label) {
   public float getAbsoluteCapacityByNodeLabel(String label) {
-    if (null == parent) {
-      return 1; 
-    }
-    
     if (StringUtils.equals(label, RMNodeLabelsManager.NO_LABEL)) {
     if (StringUtils.equals(label, RMNodeLabelsManager.NO_LABEL)) {
+      if (null == parent) {
+        return 1f; 
+      }
       return getAbsoluteCapacity();
       return getAbsoluteCapacity();
     }
     }
     
     
-    if (!absoluteMaxCapacityByNodeLabels.containsKey(label)) {
-      return 0;
+    if (!absoluteCapacityByNodeLabels.containsKey(label)) {
+      return 0f;
     } else {
     } else {
-      return absoluteMaxCapacityByNodeLabels.get(label);
+      return absoluteCapacityByNodeLabels.get(label);
     }
     }
   }
   }
   
   
@@ -433,7 +431,11 @@ public abstract class AbstractCSQueue implements CSQueue {
       return getAbsoluteMaximumCapacity();
       return getAbsoluteMaximumCapacity();
     }
     }
     
     
-    return getAbsoluteCapacityByNodeLabel(label);
+    if (!absoluteMaxCapacityByNodeLabels.containsKey(label)) {
+      return 0f;
+    } else {
+      return absoluteMaxCapacityByNodeLabels.get(label);
+    }
   }
   }
   
   
   @Private
   @Private

+ 2 - 15
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java

@@ -461,19 +461,8 @@ public class CapacitySchedulerConfiguration extends Configuration {
 
 
     for (String label : labels.contains(CommonNodeLabelsManager.ANY) ? mgr
     for (String label : labels.contains(CommonNodeLabelsManager.ANY) ? mgr
         .getClusterNodeLabels() : labels) {
         .getClusterNodeLabels() : labels) {
-      // capacity of all labels in each queue should be 1
-      if (org.apache.commons.lang.StringUtils.equals(ROOT, queue)) {
-        nodeLabelCapacities.put(label, 1.0f);
-        continue;
-      }
       String capacityPropertyName = getNodeLabelPrefix(queue, label) + CAPACITY;
       String capacityPropertyName = getNodeLabelPrefix(queue, label) + CAPACITY;
-      float capacity = getFloat(capacityPropertyName, UNDEFINED);
-      if (capacity == UNDEFINED) {
-        throw new IllegalArgumentException("Configuration issue: "
-            + " node-label=" + label + " is accessible from queue=" + queue
-            + " but has no capacity set, you should set " 
-            + capacityPropertyName + " in range of [0, 100].");
-      }
+      float capacity = getFloat(capacityPropertyName, 0f);
       if (capacity < MINIMUM_CAPACITY_VALUE
       if (capacity < MINIMUM_CAPACITY_VALUE
           || capacity > MAXIMUM_CAPACITY_VALUE) {
           || capacity > MAXIMUM_CAPACITY_VALUE) {
         throw new IllegalArgumentException("Illegal capacity of " + capacity
         throw new IllegalArgumentException("Illegal capacity of " + capacity
@@ -501,9 +490,7 @@ public class CapacitySchedulerConfiguration extends Configuration {
         .getClusterNodeLabels() : labels) {
         .getClusterNodeLabels() : labels) {
       float maxCapacity =
       float maxCapacity =
           getFloat(getNodeLabelPrefix(queue, label) + MAXIMUM_CAPACITY,
           getFloat(getNodeLabelPrefix(queue, label) + MAXIMUM_CAPACITY,
-              UNDEFINED);
-      maxCapacity = (maxCapacity == DEFAULT_MAXIMUM_CAPACITY_VALUE) ? 
-          MAXIMUM_CAPACITY_VALUE : maxCapacity;
+              100f);
       if (maxCapacity < MINIMUM_CAPACITY_VALUE
       if (maxCapacity < MINIMUM_CAPACITY_VALUE
           || maxCapacity > MAXIMUM_CAPACITY_VALUE) {
           || maxCapacity > MAXIMUM_CAPACITY_VALUE) {
         throw new IllegalArgumentException("Illegal " + "capacity of "
         throw new IllegalArgumentException("Illegal " + "capacity of "

+ 5 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java

@@ -340,6 +340,8 @@ public class TestContainerAllocation {
     
     
     // Define top-level queues
     // Define top-level queues
     conf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {"a", "b", "c"});
     conf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {"a", "b", "c"});
+    conf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "x", 100);
+    conf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "y", 100);
 
 
     final String A = CapacitySchedulerConfiguration.ROOT + ".a";
     final String A = CapacitySchedulerConfiguration.ROOT + ".a";
     conf.setCapacity(A, 10);
     conf.setCapacity(A, 10);
@@ -403,6 +405,9 @@ public class TestContainerAllocation {
     
     
     // Define top-level queues
     // Define top-level queues
     conf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {"a", "b"});
     conf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {"a", "b"});
+    conf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "x", 100);
+    conf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "y", 100);
+    conf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "z", 100);
 
 
     final String A = CapacitySchedulerConfiguration.ROOT + ".a";
     final String A = CapacitySchedulerConfiguration.ROOT + ".a";
     conf.setCapacity(A, 10);
     conf.setCapacity(A, 10);

+ 78 - 6
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueParsing.java

@@ -18,10 +18,6 @@
 
 
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
 
 
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
 import java.io.IOException;
 import java.io.IOException;
 
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.Log;
@@ -51,8 +47,9 @@ public class TestQueueParsing {
   
   
   @Before
   @Before
   public void setup() {
   public void setup() {
-    nodeLabelManager = mock(RMNodeLabelsManager.class);
-    when(nodeLabelManager.containsNodeLabel(any(String.class))).thenReturn(true);
+    nodeLabelManager = new MemoryRMNodeLabelsManager();
+    nodeLabelManager.init(new YarnConfiguration());
+    nodeLabelManager.start();
   }
   }
   
   
   @Test
   @Test
@@ -255,6 +252,8 @@ public class TestQueueParsing {
   private void setupQueueConfigurationWithLabels(CapacitySchedulerConfiguration conf) {
   private void setupQueueConfigurationWithLabels(CapacitySchedulerConfiguration conf) {
     // Define top-level queues
     // Define top-level queues
     conf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {"a", "b"});
     conf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {"a", "b"});
+    conf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "red", 100);
+    conf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "blue", 100);
 
 
     final String A = CapacitySchedulerConfiguration.ROOT + ".a";
     final String A = CapacitySchedulerConfiguration.ROOT + ".a";
     conf.setCapacity(A, 10);
     conf.setCapacity(A, 10);
@@ -271,6 +270,7 @@ public class TestQueueParsing {
     conf.setQueues(A, new String[] {"a1", "a2"});
     conf.setQueues(A, new String[] {"a1", "a2"});
     conf.setAccessibleNodeLabels(A, ImmutableSet.of("red", "blue"));
     conf.setAccessibleNodeLabels(A, ImmutableSet.of("red", "blue"));
     conf.setCapacityByLabel(A, "red", 50);
     conf.setCapacityByLabel(A, "red", 50);
+    conf.setMaximumCapacityByLabel(A, "red", 50);
     conf.setCapacityByLabel(A, "blue", 50);
     conf.setCapacityByLabel(A, "blue", 50);
     
     
     conf.setCapacity(A1, 30);
     conf.setCapacity(A1, 30);
@@ -282,6 +282,7 @@ public class TestQueueParsing {
     conf.setMaximumCapacity(A2, 85);
     conf.setMaximumCapacity(A2, 85);
     conf.setAccessibleNodeLabels(A2, ImmutableSet.of("red"));
     conf.setAccessibleNodeLabels(A2, ImmutableSet.of("red"));
     conf.setCapacityByLabel(A2, "red", 50);
     conf.setCapacityByLabel(A2, "red", 50);
+    conf.setMaximumCapacityByLabel(A2, "red", 60);
     
     
     final String B1 = B + ".b1";
     final String B1 = B + ".b1";
     final String B2 = B + ".b2";
     final String B2 = B + ".b2";
@@ -311,6 +312,8 @@ public class TestQueueParsing {
       CapacitySchedulerConfiguration conf) {
       CapacitySchedulerConfiguration conf) {
     // Define top-level queues
     // Define top-level queues
     conf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {"a", "b"});
     conf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {"a", "b"});
+    conf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "red", 100);
+    conf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "blue", 100);
 
 
     // Set A configuration
     // Set A configuration
     final String A = CapacitySchedulerConfiguration.ROOT + ".a";
     final String A = CapacitySchedulerConfiguration.ROOT + ".a";
@@ -364,6 +367,7 @@ public class TestQueueParsing {
   
   
   @Test
   @Test
   public void testQueueParsingReinitializeWithLabels() throws IOException {
   public void testQueueParsingReinitializeWithLabels() throws IOException {
+    nodeLabelManager.addToCluserNodeLabels(ImmutableSet.of("red", "blue"));
     CapacitySchedulerConfiguration csConf =
     CapacitySchedulerConfiguration csConf =
         new CapacitySchedulerConfiguration();
         new CapacitySchedulerConfiguration();
     setupQueueConfigurationWithoutLabels(csConf);
     setupQueueConfigurationWithoutLabels(csConf);
@@ -410,6 +414,22 @@ public class TestQueueParsing {
     // queue-B2 inherits "red"/"blue"
     // queue-B2 inherits "red"/"blue"
     Assert.assertTrue(capacityScheduler.getQueue("b2")
     Assert.assertTrue(capacityScheduler.getQueue("b2")
         .getAccessibleNodeLabels().containsAll(ImmutableSet.of("red", "blue")));
         .getAccessibleNodeLabels().containsAll(ImmutableSet.of("red", "blue")));
+    
+    // check capacity of A2
+    CSQueue qA2 = capacityScheduler.getQueue("a2");
+    Assert.assertEquals(0.7, qA2.getCapacity(), DELTA);
+    Assert.assertEquals(0.5, qA2.getCapacityByNodeLabel("red"), DELTA);
+    Assert.assertEquals(0.07, qA2.getAbsoluteCapacity(), DELTA);
+    Assert.assertEquals(0.25, qA2.getAbsoluteCapacityByNodeLabel("red"), DELTA);
+    Assert.assertEquals(0.1275, qA2.getAbsoluteMaximumCapacity(), DELTA);
+    Assert.assertEquals(0.3, qA2.getAbsoluteMaximumCapacityByNodeLabel("red"), DELTA);
+    
+    // check capacity of B3
+    CSQueue qB3 = capacityScheduler.getQueue("b3");
+    Assert.assertEquals(0.18, qB3.getAbsoluteCapacity(), DELTA);
+    Assert.assertEquals(0.125, qB3.getAbsoluteCapacityByNodeLabel("red"), DELTA);
+    Assert.assertEquals(0.35, qB3.getAbsoluteMaximumCapacity(), DELTA);
+    Assert.assertEquals(1, qB3.getAbsoluteMaximumCapacityByNodeLabel("red"), DELTA);
   }
   }
   
   
   private void
   private void
@@ -435,6 +455,8 @@ public class TestQueueParsing {
   
   
   @Test
   @Test
   public void testQueueParsingWithLabels() throws IOException {
   public void testQueueParsingWithLabels() throws IOException {
+    nodeLabelManager.addToCluserNodeLabels(ImmutableSet.of("red", "blue"));
+    
     YarnConfiguration conf = new YarnConfiguration();
     YarnConfiguration conf = new YarnConfiguration();
     CapacitySchedulerConfiguration csConf =
     CapacitySchedulerConfiguration csConf =
         new CapacitySchedulerConfiguration(conf);
         new CapacitySchedulerConfiguration(conf);
@@ -457,6 +479,8 @@ public class TestQueueParsing {
   
   
   @Test
   @Test
   public void testQueueParsingWithLabelsInherit() throws IOException {
   public void testQueueParsingWithLabelsInherit() throws IOException {
+    nodeLabelManager.addToCluserNodeLabels(ImmutableSet.of("red", "blue"));
+
     YarnConfiguration conf = new YarnConfiguration();
     YarnConfiguration conf = new YarnConfiguration();
     CapacitySchedulerConfiguration csConf =
     CapacitySchedulerConfiguration csConf =
         new CapacitySchedulerConfiguration(conf);
         new CapacitySchedulerConfiguration(conf);
@@ -587,4 +611,52 @@ public class TestQueueParsing {
     ServiceOperations.stopQuietly(capacityScheduler);
     ServiceOperations.stopQuietly(capacityScheduler);
     ServiceOperations.stopQuietly(nodeLabelsManager);
     ServiceOperations.stopQuietly(nodeLabelsManager);
   }
   }
+  
+  @Test
+  public void testQueueParsingWithUnusedLabels() throws IOException {
+    final ImmutableSet<String> labels = ImmutableSet.of("red", "blue");
+    
+    // Initialize a cluster with labels, but doesn't use them, reinitialize
+    // shouldn't fail
+    nodeLabelManager.addToCluserNodeLabels(labels);
+
+    CapacitySchedulerConfiguration csConf =
+        new CapacitySchedulerConfiguration();
+    setupQueueConfiguration(csConf);
+    csConf.setAccessibleNodeLabels(CapacitySchedulerConfiguration.ROOT, labels);
+    YarnConfiguration conf = new YarnConfiguration(csConf);
+
+    CapacityScheduler capacityScheduler = new CapacityScheduler();
+    capacityScheduler.setConf(conf);
+    RMContextImpl rmContext =
+        new RMContextImpl(null, null, null, null, null, null,
+            new RMContainerTokenSecretManager(csConf),
+            new NMTokenSecretManagerInRM(csConf),
+            new ClientToAMTokenSecretManagerInRM(), null);
+    rmContext.setNodeLabelManager(nodeLabelManager);
+    capacityScheduler.setRMContext(rmContext);
+    capacityScheduler.init(conf);
+    capacityScheduler.start();
+    capacityScheduler.reinitialize(conf, rmContext);
+    
+    // check root queue's capacity by label -- they should be all zero
+    CSQueue root = capacityScheduler.getQueue(CapacitySchedulerConfiguration.ROOT);
+    Assert.assertEquals(0, root.getCapacityByNodeLabel("red"), DELTA);
+    Assert.assertEquals(0, root.getCapacityByNodeLabel("blue"), DELTA);
+
+    CSQueue a = capacityScheduler.getQueue("a");
+    Assert.assertEquals(0.10, a.getAbsoluteCapacity(), DELTA);
+    Assert.assertEquals(0.15, a.getAbsoluteMaximumCapacity(), DELTA);
+
+    CSQueue b1 = capacityScheduler.getQueue("b1");
+    Assert.assertEquals(0.2 * 0.5, b1.getAbsoluteCapacity(), DELTA);
+    Assert.assertEquals("Parent B has no MAX_CAP", 0.85,
+        b1.getAbsoluteMaximumCapacity(), DELTA);
+
+    CSQueue c12 = capacityScheduler.getQueue("c12");
+    Assert.assertEquals(0.7 * 0.5 * 0.45, c12.getAbsoluteCapacity(), DELTA);
+    Assert.assertEquals(0.7 * 0.55 * 0.7, c12.getAbsoluteMaximumCapacity(),
+        DELTA);
+    capacityScheduler.stop();
+  }
 }
 }