Prechádzať zdrojové kódy

YARN-9204. RM fails to start if absolute resource is specified for partition capacity in CS queues. Contributed by Jiandan Yang.

(cherry picked from commit abde1e1f58d5b699e4b8e460cff68e154738169b)
Weiwei Yang 6 rokov pred
rodič
commit
ac2f4b64f9

+ 21 - 3
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java

@@ -474,6 +474,17 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
         ", capacity=" + capacity);
   }
 
+  @VisibleForTesting
+  public void setCapacity(String queue, String absoluteResourceCapacity) {
+    if (queue.equals("root")) {
+      throw new IllegalArgumentException(
+          "Cannot set capacity, root queue has a fixed capacity");
+    }
+    set(getQueuePrefix(queue) + CAPACITY, absoluteResourceCapacity);
+    LOG.debug("CSConf - setCapacity: queuePrefix=" + getQueuePrefix(queue)
+        + ", capacity=" + absoluteResourceCapacity);
+  }
+
   public float getNonLabeledQueueMaximumCapacity(String queue) {
     String configuredCapacity = get(getQueuePrefix(queue) + MAXIMUM_CAPACITY);
     boolean matcher = (configuredCapacity != null)
@@ -508,7 +519,13 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
   public void setCapacityByLabel(String queue, String label, float capacity) {
     setFloat(getNodeLabelPrefix(queue, label) + CAPACITY, capacity);
   }
-  
+
+  @VisibleForTesting
+  public void setCapacityByLabel(String queue, String label,
+                                 String absoluteResourceCapacity) {
+    set(getNodeLabelPrefix(queue, label) + CAPACITY, absoluteResourceCapacity);
+  }
+
   public void setMaximumCapacityByLabel(String queue, String label,
       float capacity) {
     setFloat(getNodeLabelPrefix(queue, label) + MAXIMUM_CAPACITY, capacity);
@@ -645,8 +662,9 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
   private float internalGetLabeledQueueCapacity(String queue, String label, String suffix,
       float defaultValue) {
     String capacityPropertyName = getNodeLabelPrefix(queue, label) + suffix;
-    boolean matcher = (capacityPropertyName != null)
-        && RESOURCE_PATTERN.matcher(capacityPropertyName).find();
+    String configuredCapacity = get(capacityPropertyName);
+    boolean matcher = (configuredCapacity != null)
+        && RESOURCE_PATTERN.matcher(configuredCapacity).find();
     if (matcher) {
       // Return capacity in percentage as 0 for non-root queues and 100 for
       // root.From AbstractCSQueue, absolute resource will be parsed and

+ 41 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java

@@ -42,6 +42,7 @@ import java.util.Map;
 import java.util.concurrent.BrokenBarrierException;
 import java.util.concurrent.CyclicBarrier;
 
+import com.google.common.collect.Sets;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -123,6 +124,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeResourceUpdateEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueResourceQuotas;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerUpdates;
@@ -991,6 +993,45 @@ public class TestCapacityScheduler extends CapacitySchedulerTestBase {
       new ClientToAMTokenSecretManagerInRM(), null));
   }
 
+  @Test
+  public void testParseQueueWithAbsoluteResource() {
+    String childQueue = "testQueue";
+    String labelName = "testLabel";
+
+    CapacityScheduler cs = new CapacityScheduler();
+    cs.setConf(new YarnConfiguration());
+    cs.setRMContext(resourceManager.getRMContext());
+    CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
+
+    conf.setQueues("root", new String[] {childQueue});
+    conf.setCapacity("root." + childQueue, "[memory=20480,vcores=200]");
+    conf.setAccessibleNodeLabels("root." + childQueue,
+        Sets.newHashSet(labelName));
+    conf.setCapacityByLabel("root", labelName, "[memory=10240,vcores=100]");
+    conf.setCapacityByLabel("root." + childQueue, labelName,
+        "[memory=4096,vcores=10]");
+
+    cs.init(conf);
+    cs.start();
+
+    Resource rootQueueLableCapacity =
+        cs.getQueue("root").getQueueResourceQuotas()
+            .getConfiguredMinResource(labelName);
+    assertEquals(10240, rootQueueLableCapacity.getMemorySize());
+    assertEquals(100, rootQueueLableCapacity.getVirtualCores());
+
+    QueueResourceQuotas childQueueQuotas =
+        cs.getQueue(childQueue).getQueueResourceQuotas();
+    Resource childQueueCapacity = childQueueQuotas.getConfiguredMinResource();
+    assertEquals(20480, childQueueCapacity.getMemorySize());
+    assertEquals(200, childQueueCapacity.getVirtualCores());
+
+    Resource childQueueLabelCapacity =
+        childQueueQuotas.getConfiguredMinResource(labelName);
+    assertEquals(4096, childQueueLabelCapacity.getMemorySize());
+    assertEquals(10, childQueueLabelCapacity.getVirtualCores());
+  }
+
   @Test
   public void testReconnectedNode() throws Exception {
     CapacitySchedulerConfiguration csConf =