Explorar el Código

YARN-9204. RM fails to start if absolute resource is specified for partition capacity in CS queues. Contributed by Jiandan Yang.

(cherry picked from commit abde1e1f58d5b699e4b8e460cff68e154738169b)
(cherry picked from commit 4edd883d4818dee3bcf2f8dea3e3d8ebf35d6605)
Weiwei Yang hace 6 años
padre
commit
211e2caef2

+ 21 - 3
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java

@@ -471,6 +471,17 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
         ", capacity=" + capacity);
   }
 
+  @VisibleForTesting
+  public void setCapacity(String queue, String absoluteResourceCapacity) {
+    if (queue.equals("root")) {
+      throw new IllegalArgumentException(
+          "Cannot set capacity, root queue has a fixed capacity");
+    }
+    set(getQueuePrefix(queue) + CAPACITY, absoluteResourceCapacity);
+    LOG.debug("CSConf - setCapacity: queuePrefix=" + getQueuePrefix(queue)
+        + ", capacity=" + absoluteResourceCapacity);
+  }
+
   public float getNonLabeledQueueMaximumCapacity(String queue) {
     String configuredCapacity = get(getQueuePrefix(queue) + MAXIMUM_CAPACITY);
     boolean matcher = (configuredCapacity != null)
@@ -505,7 +516,13 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
   public void setCapacityByLabel(String queue, String label, float capacity) {
     setFloat(getNodeLabelPrefix(queue, label) + CAPACITY, capacity);
   }
-  
+
+  @VisibleForTesting
+  public void setCapacityByLabel(String queue, String label,
+                                 String absoluteResourceCapacity) {
+    set(getNodeLabelPrefix(queue, label) + CAPACITY, absoluteResourceCapacity);
+  }
+
   public void setMaximumCapacityByLabel(String queue, String label,
       float capacity) {
     setFloat(getNodeLabelPrefix(queue, label) + MAXIMUM_CAPACITY, capacity);
@@ -642,8 +659,9 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
   private float internalGetLabeledQueueCapacity(String queue, String label, String suffix,
       float defaultValue) {
     String capacityPropertyName = getNodeLabelPrefix(queue, label) + suffix;
-    boolean matcher = (capacityPropertyName != null)
-        && RESOURCE_PATTERN.matcher(capacityPropertyName).find();
+    String configuredCapacity = get(capacityPropertyName);
+    boolean matcher = (configuredCapacity != null)
+        && RESOURCE_PATTERN.matcher(configuredCapacity).find();
     if (matcher) {
       // Return capacity in percentage as 0 for non-root queues and 100 for
       // root.From AbstractCSQueue, absolute resource will be parsed and

+ 41 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java

@@ -40,6 +40,7 @@ import java.util.Map;
 import java.util.concurrent.BrokenBarrierException;
 import java.util.concurrent.CyclicBarrier;
 
+import com.google.common.collect.Sets;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -123,6 +124,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeResourceUpdateEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueResourceQuotas;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerUpdates;
@@ -957,6 +959,45 @@ public class TestCapacityScheduler extends CapacitySchedulerTestBase {
       new ClientToAMTokenSecretManagerInRM(), null));
   }
 
+  @Test
+  public void testParseQueueWithAbsoluteResource() {
+    String childQueue = "testQueue";
+    String labelName = "testLabel";
+
+    CapacityScheduler cs = new CapacityScheduler();
+    cs.setConf(new YarnConfiguration());
+    cs.setRMContext(resourceManager.getRMContext());
+    CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
+
+    conf.setQueues("root", new String[] {childQueue});
+    conf.setCapacity("root." + childQueue, "[memory=20480,vcores=200]");
+    conf.setAccessibleNodeLabels("root." + childQueue,
+        Sets.newHashSet(labelName));
+    conf.setCapacityByLabel("root", labelName, "[memory=10240,vcores=100]");
+    conf.setCapacityByLabel("root." + childQueue, labelName,
+        "[memory=4096,vcores=10]");
+
+    cs.init(conf);
+    cs.start();
+
+    Resource rootQueueLableCapacity =
+        cs.getQueue("root").getQueueResourceQuotas()
+            .getConfiguredMinResource(labelName);
+    assertEquals(10240, rootQueueLableCapacity.getMemorySize());
+    assertEquals(100, rootQueueLableCapacity.getVirtualCores());
+
+    QueueResourceQuotas childQueueQuotas =
+        cs.getQueue(childQueue).getQueueResourceQuotas();
+    Resource childQueueCapacity = childQueueQuotas.getConfiguredMinResource();
+    assertEquals(20480, childQueueCapacity.getMemorySize());
+    assertEquals(200, childQueueCapacity.getVirtualCores());
+
+    Resource childQueueLabelCapacity =
+        childQueueQuotas.getConfiguredMinResource(labelName);
+    assertEquals(4096, childQueueLabelCapacity.getMemorySize());
+    assertEquals(10, childQueueLabelCapacity.getVirtualCores());
+  }
+
   @Test
   public void testReconnectedNode() throws Exception {
     CapacitySchedulerConfiguration csConf =