Browse Source

YARN-7544. Use queue-path.capacity/maximum-capacity to specify absolute min/max resources. (Sunil G via wangda)

Change-Id: I685341be213eee500f51e02f01c91def89391c17
Wangda Tan 7 years ago
parent
commit
50fca81b38

+ 60 - 19
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java

@@ -329,7 +329,9 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
 
 
   public static final String DEFAULT_RESOURCE_TYPES = "memory,vcores";
   public static final String DEFAULT_RESOURCE_TYPES = "memory,vcores";
 
 
-  public static final String PATTERN_FOR_ABSOLUTE_RESOURCE = "\\[([^\\]]+)";
+  public static final String PATTERN_FOR_ABSOLUTE_RESOURCE = "^\\[[\\w\\.,\\-_=\\ /]+\\]$";
+
+  private static final Pattern RESOURCE_PATTERN = Pattern.compile(PATTERN_FOR_ABSOLUTE_RESOURCE);
 
 
   public enum AbsoluteResourceType {
   public enum AbsoluteResourceType {
     MEMORY, VCORES;
     MEMORY, VCORES;
@@ -411,14 +413,29 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
   }
   }
   
   
   public float getNonLabeledQueueCapacity(String queue) {
   public float getNonLabeledQueueCapacity(String queue) {
-    float capacity = queue.equals("root") ? 100.0f : getFloat(
-        getQueuePrefix(queue) + CAPACITY, 0f);
-    if (capacity < MINIMUM_CAPACITY_VALUE || capacity > MAXIMUM_CAPACITY_VALUE) {
-      throw new IllegalArgumentException("Illegal " +
-      		"capacity of " + capacity + " for queue " + queue);
+    String configuredCapacity = get(getQueuePrefix(queue) + CAPACITY);
+    boolean matcher = (configuredCapacity != null)
+        && RESOURCE_PATTERN.matcher(configuredCapacity).find();
+    if (matcher) {
+      // Return capacity in percentage as 0 for non-root queues and 100 for
+      // root.From AbstractCSQueue, absolute resource will be parsed and
+      // updated. Once nodes are added/removed in cluster, capacity in
+      // percentage will also be re-calculated.
+      return queue.equals("root") ? 100.0f : 0f;
     }
     }
-    LOG.debug("CSConf - getCapacity: queuePrefix=" + getQueuePrefix(queue) + 
-        ", capacity=" + capacity);
+
+    float capacity = queue.equals("root")
+        ? 100.0f
+        : (configuredCapacity == null)
+            ? 0f
+            : Float.parseFloat(configuredCapacity);
+    if (capacity < MINIMUM_CAPACITY_VALUE
+        || capacity > MAXIMUM_CAPACITY_VALUE) {
+      throw new IllegalArgumentException(
+          "Illegal " + "capacity of " + capacity + " for queue " + queue);
+    }
+    LOG.debug("CSConf - getCapacity: queuePrefix=" + getQueuePrefix(queue)
+        + ", capacity=" + capacity);
     return capacity;
     return capacity;
   }
   }
   
   
@@ -433,10 +450,23 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
   }
   }
 
 
   public float getNonLabeledQueueMaximumCapacity(String queue) {
   public float getNonLabeledQueueMaximumCapacity(String queue) {
-    float maxCapacity = getFloat(getQueuePrefix(queue) + MAXIMUM_CAPACITY,
-        MAXIMUM_CAPACITY_VALUE);
-    maxCapacity = (maxCapacity == DEFAULT_MAXIMUM_CAPACITY_VALUE) ? 
-        MAXIMUM_CAPACITY_VALUE : maxCapacity;
+    String configuredCapacity = get(getQueuePrefix(queue) + MAXIMUM_CAPACITY);
+    boolean matcher = (configuredCapacity != null)
+        && RESOURCE_PATTERN.matcher(configuredCapacity).find();
+    if (matcher) {
+      // Return capacity in percentage as 0 for non-root queues and 100 for
+      // root.From AbstractCSQueue, absolute resource will be parsed and
+      // updated. Once nodes are added/removed in cluster, capacity in
+      // percentage will also be re-calculated.
+      return 100.0f;
+    }
+
+    float maxCapacity = (configuredCapacity == null)
+        ? MAXIMUM_CAPACITY_VALUE
+        : Float.parseFloat(configuredCapacity);
+    maxCapacity = (maxCapacity == DEFAULT_MAXIMUM_CAPACITY_VALUE)
+        ? MAXIMUM_CAPACITY_VALUE
+        : maxCapacity;
     return maxCapacity;
     return maxCapacity;
   }
   }
   
   
@@ -590,6 +620,16 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
   private float internalGetLabeledQueueCapacity(String queue, String label, String suffix,
   private float internalGetLabeledQueueCapacity(String queue, String label, String suffix,
       float defaultValue) {
       float defaultValue) {
     String capacityPropertyName = getNodeLabelPrefix(queue, label) + suffix;
     String capacityPropertyName = getNodeLabelPrefix(queue, label) + suffix;
+    boolean matcher = (capacityPropertyName != null)
+        && RESOURCE_PATTERN.matcher(capacityPropertyName).find();
+    if (matcher) {
+      // Return capacity in percentage as 0 for non-root queues and 100 for
+      // root.From AbstractCSQueue, absolute resource will be parsed and
+      // updated. Once nodes are added/removed in cluster, capacity in
+      // percentage will also be re-calculated.
+      return defaultValue;
+    }
+
     float capacity = getFloat(capacityPropertyName, defaultValue);
     float capacity = getFloat(capacityPropertyName, defaultValue);
     if (capacity < MINIMUM_CAPACITY_VALUE
     if (capacity < MINIMUM_CAPACITY_VALUE
         || capacity > MAXIMUM_CAPACITY_VALUE) {
         || capacity > MAXIMUM_CAPACITY_VALUE) {
@@ -1722,7 +1762,7 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
   public Resource getMinimumResourceRequirement(String label, String queue,
   public Resource getMinimumResourceRequirement(String label, String queue,
       Set<String> resourceTypes) {
       Set<String> resourceTypes) {
     return internalGetLabeledResourceRequirementForQueue(queue, label,
     return internalGetLabeledResourceRequirementForQueue(queue, label,
-        resourceTypes, MINIMUM_RESOURCE);
+        resourceTypes, CAPACITY);
   }
   }
 
 
   /**
   /**
@@ -1739,19 +1779,19 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
   public Resource getMaximumResourceRequirement(String label, String queue,
   public Resource getMaximumResourceRequirement(String label, String queue,
       Set<String> resourceTypes) {
       Set<String> resourceTypes) {
     return internalGetLabeledResourceRequirementForQueue(queue, label,
     return internalGetLabeledResourceRequirementForQueue(queue, label,
-        resourceTypes, MAXIMUM_RESOURCE);
+        resourceTypes, MAXIMUM_CAPACITY);
   }
   }
 
 
   @VisibleForTesting
   @VisibleForTesting
   public void setMinimumResourceRequirement(String label, String queue,
   public void setMinimumResourceRequirement(String label, String queue,
       Resource resource) {
       Resource resource) {
-    updateMinMaxResourceToConf(label, queue, resource, MINIMUM_RESOURCE);
+    updateMinMaxResourceToConf(label, queue, resource, CAPACITY);
   }
   }
 
 
   @VisibleForTesting
   @VisibleForTesting
   public void setMaximumResourceRequirement(String label, String queue,
   public void setMaximumResourceRequirement(String label, String queue,
       Resource resource) {
       Resource resource) {
-    updateMinMaxResourceToConf(label, queue, resource, MAXIMUM_RESOURCE);
+    updateMinMaxResourceToConf(label, queue, resource, MAXIMUM_CAPACITY);
   }
   }
 
 
   private void updateMinMaxResourceToConf(String label, String queue,
   private void updateMinMaxResourceToConf(String label, String queue,
@@ -1786,8 +1826,8 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
 
 
     // Define resource here.
     // Define resource here.
     Resource resource = Resource.newInstance(0l, 0);
     Resource resource = Resource.newInstance(0l, 0);
-    Matcher matcher = Pattern.compile(PATTERN_FOR_ABSOLUTE_RESOURCE)
-        .matcher(resourceString);
+    Matcher matcher = RESOURCE_PATTERN.matcher(resourceString);
+
     /*
     /*
      * Absolute resource configuration for a queue will be grouped by "[]".
      * Absolute resource configuration for a queue will be grouped by "[]".
      * Syntax of absolute resource config could be like below
      * Syntax of absolute resource config could be like below
@@ -1795,11 +1835,12 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
      */
      */
     if (matcher.find()) {
     if (matcher.find()) {
       // Get the sub-group.
       // Get the sub-group.
-      String subGroup = matcher.group(1);
+      String subGroup = matcher.group(0);
       if (subGroup.trim().isEmpty()) {
       if (subGroup.trim().isEmpty()) {
         return Resources.none();
         return Resources.none();
       }
       }
 
 
+      subGroup = subGroup.substring(1, subGroup.length() - 1);
       for (String kvPair : subGroup.trim().split(",")) {
       for (String kvPair : subGroup.trim().split(",")) {
         String[] splits = kvPair.split("=");
         String[] splits = kvPair.split("=");
 
 

+ 14 - 16
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestAbsoluteResourceConfiguration.java

@@ -27,7 +27,6 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
 import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
 import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
 import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
-import org.apache.hadoop.yarn.util.resource.Resources;
 import org.junit.Assert;
 import org.junit.Assert;
 import org.junit.Test;
 import org.junit.Test;
 
 
@@ -233,7 +232,12 @@ public class TestAbsoluteResourceConfiguration {
   @Test
   @Test
   public void testSimpleValidateAbsoluteResourceConfig() throws Exception {
   public void testSimpleValidateAbsoluteResourceConfig() throws Exception {
     /**
     /**
-     * Queue structure is as follows. root / | \ a b c / \ | a1 a2 b1
+     * Queue structure is as follows.
+     *    root
+     *   / | \
+     *   a b c
+     *   / \ |
+     *  a1 a2 b1
      *
      *
      * Test below cases 1) Configure percentage based capacity and absolute
      * Test below cases 1) Configure percentage based capacity and absolute
      * resource together. 2) As per above tree structure, ensure all values
      * resource together. 2) As per above tree structure, ensure all values
@@ -258,21 +262,15 @@ public class TestAbsoluteResourceConfiguration {
     // Get queue object to verify min/max resource configuration.
     // Get queue object to verify min/max resource configuration.
     CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
     CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
 
 
-    // 1. Create a new config with capcity and min/max together. Ensure an
-    // exception is thrown.
+    // 1. Create a new config with min/max.
     CapacitySchedulerConfiguration csConf1 = setupSimpleQueueConfiguration(
     CapacitySchedulerConfiguration csConf1 = setupSimpleQueueConfiguration(
         true);
         true);
     setupMinMaxResourceConfiguration(csConf1);
     setupMinMaxResourceConfiguration(csConf1);
 
 
     try {
     try {
       cs.reinitialize(csConf1, rm.getRMContext());
       cs.reinitialize(csConf1, rm.getRMContext());
-      Assert.fail();
     } catch (IOException e) {
     } catch (IOException e) {
-      Assert.assertTrue(e instanceof IOException);
-      Assert.assertEquals(
-          "Failed to re-init queues : Queue 'queueA' should use either"
-              + " percentage based capacity configuration or absolute resource.",
-          e.getMessage());
+      Assert.fail();
     }
     }
     rm.stop();
     rm.stop();
 
 
@@ -368,7 +366,12 @@ public class TestAbsoluteResourceConfiguration {
   @Test
   @Test
   public void testComplexValidateAbsoluteResourceConfig() throws Exception {
   public void testComplexValidateAbsoluteResourceConfig() throws Exception {
     /**
     /**
-     * Queue structure is as follows. root / | \ a b c / \ | a1 a2 b1
+     * Queue structure is as follows.
+     *   root
+     *  / | \
+     *  a b c
+     * / \ |
+     * a1 a2 b1
      *
      *
      * Test below cases: 1) Parent and its child queues must use either
      * Test below cases: 1) Parent and its child queues must use either
      * percentage based or absolute resource configuration. 2) Parent's min
      * percentage based or absolute resource configuration. 2) Parent's min
@@ -396,11 +399,6 @@ public class TestAbsoluteResourceConfiguration {
     csConf.setCapacity(QUEUEB_FULL, 25f);
     csConf.setCapacity(QUEUEB_FULL, 25f);
     csConf.setCapacity(QUEUEC_FULL, 25f);
     csConf.setCapacity(QUEUEC_FULL, 25f);
 
 
-    // Also unset resource based config.
-    csConf.setMinimumResourceRequirement("", QUEUEA_FULL, Resources.none());
-    csConf.setMinimumResourceRequirement("", QUEUEB_FULL, Resources.none());
-    csConf.setMinimumResourceRequirement("", QUEUEC_FULL, Resources.none());
-
     // Get queue object to verify min/max resource configuration.
     // Get queue object to verify min/max resource configuration.
     CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
     CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
     try {
     try {