Просмотр исходного кода

YARN-8858. CapacityScheduler should respect maximum node resource when per-queue maximum-allocation is being used. Contributed by Wangda Tan.

(cherry picked from commit edce866489d83744f3f47a3b884b0c6136885e4a)
Weiwei Yang 6 лет назад
Родитель
Сommit
e75191a62e

+ 11 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ClusterNodeTracker.java

@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -228,6 +229,16 @@ public class ClusterNodeTracker<N extends SchedulerNode> {
     }
   }
 
+  @VisibleForTesting
+  public void setForceConfiguredMaxAllocation(boolean flag) {
+    writeLock.lock();
+    try {
+      forceConfiguredMaxAllocation = flag;
+    } finally {
+      writeLock.unlock();
+    }
+  }
+
   private void updateMaxResources(SchedulerNode node, boolean add) {
     Resource totalResource = node.getTotalResource();
     writeLock.lock();

+ 11 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java

@@ -2199,7 +2199,17 @@ public class CapacityScheduler extends
       LOG.error("queue " + queueName + " is not an leaf queue");
       return getMaximumResourceCapability();
     }
-    return ((LeafQueue)queue).getMaximumAllocation();
+
+    // queue.getMaxAllocation returns *configured* maximum allocation.
+    // getMaximumResourceCapability() returns maximum allocation considers
+    // per-node maximum resources. So return (component-wise) min of the two.
+
+    Resource queueMaxAllocation = ((LeafQueue)queue).getMaximumAllocation();
+    Resource clusterMaxAllocationConsiderNodeMax =
+        getMaximumResourceCapability();
+
+    return Resources.componentwiseMin(queueMaxAllocation,
+        clusterMaxAllocationConsiderNodeMax);
   }
 
   private String handleMoveToPlanQueue(String targetQueueName) {

+ 52 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java

@@ -37,6 +37,7 @@ import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.api.records.Token;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException;
 import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
 import org.apache.hadoop.yarn.server.api.ContainerType;
 import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
@@ -65,6 +66,7 @@ import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.MAXIMUM_ALLOCATION_MB;
 
 public class TestContainerAllocation {
 
@@ -1062,4 +1064,54 @@ public class TestContainerAllocation {
 
     rm1.close();
   }
+
+  @Test(timeout = 60000)
+  public void testContainerRejectionWhenAskBeyondDynamicMax()
+      throws Exception {
+    CapacitySchedulerConfiguration newConf =
+        (CapacitySchedulerConfiguration) TestUtils
+            .getConfigurationWithMultipleQueues(conf);
+    newConf.setClass(CapacitySchedulerConfiguration.RESOURCE_CALCULATOR_CLASS,
+        DominantResourceCalculator.class, ResourceCalculator.class);
+    newConf.set(CapacitySchedulerConfiguration.getQueuePrefix("root.a")
+        + MAXIMUM_ALLOCATION_MB, "4096");
+
+    MockRM rm1 = new MockRM(newConf);
+    rm1.start();
+
+    // before any node registered or before registration timeout,
+    // submit an app beyond queue max leads to failure.
+    boolean submitFailed = false;
+    MockNM nm1 = rm1.registerNode("h1:1234", 2 * GB, 1);
+    RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "a");
+    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
+    try {
+      am1.allocate("*", 5 * GB, 1, null);
+    } catch (InvalidResourceRequestException e) {
+      submitFailed = true;
+    }
+    Assert.assertTrue(submitFailed);
+
+    // Ask 4GB succeeded.
+    am1.allocate("*", 4 * GB, 1, null);
+
+    // Add a new node, now the cluster maximum should be refreshed to 3GB.
+    CapacityScheduler cs = (CapacityScheduler)rm1.getResourceScheduler();
+    cs.getNodeTracker().setForceConfiguredMaxAllocation(false);
+    rm1.registerNode("h2:1234", 3 * GB, 1);
+
+    // Now ask 4 GB will fail
+    submitFailed = false;
+    try {
+      am1.allocate("*", 4 * GB, 1, null);
+    } catch (InvalidResourceRequestException e) {
+      submitFailed = true;
+    }
+    Assert.assertTrue(submitFailed);
+
+    // But ask 3 GB succeeded.
+    am1.allocate("*", 3 * GB, 1, null);
+
+    rm1.close();
+  }
 }