Przeglądaj źródła

YARN-10935. AM Total Queue Limit goes below per-user AM Limit if parent is full. Contributed by Eric Payne.

(cherry picked from commit ce3def0aa790ba5f29b2627dba89c169d0903291)
Eric Badger 3 lat temu
rodzic
commit
043c380350

+ 7 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java

@@ -764,8 +764,13 @@ public class LeafQueue extends AbstractCSQueue {
 
       // Current usable resource for this queue and partition is the max of
       // queueCurrentLimit and queuePartitionResource.
-      Resource queuePartitionUsableResource = Resources.max(resourceCalculator,
-          lastClusterResource, queueCurrentLimit, queuePartitionResource);
+      // If any of the resources available to this queue are less than queue's
+      // guarantee, use the guarantee as the queuePartitionUsableResource
+      // because nothing less than the queue's guarantee should be used when
+      // calculating the AM limit.
+      Resource queuePartitionUsableResource = (Resources.fitsIn(
+          resourceCalculator, queuePartitionResource, queueCurrentLimit)) ?
+              queueCurrentLimit : queuePartitionResource;
 
       Resource amResouceLimit = Resources.multiplyAndNormalizeUp(
           resourceCalculator, queuePartitionUsableResource, amResourcePercent,

+ 74 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java

@@ -69,6 +69,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaS
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
 import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
 import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
+import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator;
 import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
 import org.apache.hadoop.yarn.util.resource.Resources;
 import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.PREFIX;
@@ -880,4 +881,77 @@ public class TestApplicationLimits {
     rm.killApp(app14.getApplicationId());
     rm.stop();
   }
+
+  // Test that max AM limit is correct in the case where one resource is
+  // depleted but the other is not. Use DominantResourceCalculator.
+  @Test
+  public void testAMResourceLimitWithDRCAndFullParent() throws Exception {
+    CapacitySchedulerConfiguration csConf =
+        new CapacitySchedulerConfiguration();
+    setupQueueConfiguration(csConf);
+    csConf.setFloat(CapacitySchedulerConfiguration.
+        MAXIMUM_APPLICATION_MASTERS_RESOURCE_PERCENT, 0.3f);
+    YarnConfiguration conf = new YarnConfiguration();
+
+    CapacitySchedulerContext csContext = mock(CapacitySchedulerContext.class);
+    when(csContext.getConfiguration()).thenReturn(csConf);
+    when(csContext.getConf()).thenReturn(conf);
+    when(csContext.getMinimumResourceCapability()).
+        thenReturn(Resources.createResource(GB));
+    when(csContext.getMaximumResourceCapability()).
+        thenReturn(Resources.createResource(16*GB));
+    when(csContext.getResourceCalculator()).
+        thenReturn(new DominantResourceCalculator());
+    when(csContext.getRMContext()).thenReturn(rmContext);
+    when(csContext.getPreemptionManager()).thenReturn(new PreemptionManager());
+
+    // Total cluster resources.
+    Resource clusterResource = Resources.createResource(100 * GB, 1000);
+    when(csContext.getClusterResource()).thenReturn(clusterResource);
+
+    // Set up queue hierarchy.
+    Map<String, CSQueue> queues = new HashMap<String, CSQueue>();
+    CSQueue rootQueue = CapacitySchedulerQueueManager.parseQueue(csContext,
+        csConf, null, "root", queues, queues, TestUtils.spyHook);
+    rootQueue.updateClusterResource(clusterResource,
+        new ResourceLimits(clusterResource));
+
+    // Queue "queueA" has a 30% capacity guarantee. The max pct of "queueA" that
+    // can be used for AMs is 30%. So, 30% of <memory: 100GB, vCores: 1000> is
+    // <memory: 30GB, vCores: 30>, which is the guaranteed capacity of "queueA".
+    // 30% of that (rounded to the nearest 1GB) is <memory: 9GB, vCores: 9>. The
+    // max AM queue limit should never be less than that for any resource.
+    LeafQueue queueA = TestLeafQueue.stubLeafQueue((LeafQueue)queues.get(A));
+    queueA.setCapacity(30.0f);
+    queueA.setUserLimitFactor(10f);
+    queueA.setMaxAMResourcePerQueuePercent(0.3f);
+    // Make sure "queueA" knows the total cluster resource.
+    queueA.updateClusterResource(clusterResource, new ResourceLimits(
+        clusterResource));
+    // Get "queueA"'s guaranteed capacity (<memory: 30GB, vCores: 300>).
+    Resource capacity =
+        Resources.multiply(clusterResource, (queueA.getCapacity()/100));
+    // Limit is the actual resources available to "queueA". The following
+    // simulates the case where a second queue ("queueB") has "borrowed" almost
+    // all of "queueA"'s resources because "queueB" has a max capacity of 100%
+    // and has gone well over its guaranteed capacity. In this case, "queueB"
+    // has used 99GB of memory and used 505 vCores. This is to make vCores
+    // dominant in the calculations for the available resources.
+    when(queueA.getEffectiveCapacity(any())).thenReturn(capacity);
+    Resource limit = Resource.newInstance(1024, 495);
+    ResourceLimits currentResourceLimits =
+        new ResourceLimits(limit, Resources.none());
+    queueA.updateClusterResource(clusterResource, currentResourceLimits);
+    Resource expectedAmLimit = Resources.multiply(capacity,
+        queueA.getMaxAMResourcePerQueuePercent());
+    Resource amLimit = queueA.calculateAndGetAMResourceLimit();
+    assertTrue("AM memory limit is less than expected: Expected: " +
+        expectedAmLimit.getMemorySize() + "; Computed: "
+        + amLimit.getMemorySize(),
+        amLimit.getMemorySize() >= expectedAmLimit.getMemorySize());
+    assertTrue("AM vCore limit is less than expected: Expected: " +
+        expectedAmLimit.getVirtualCores() + "; Computed: "
+        + amLimit.getVirtualCores(),
+        amLimit.getVirtualCores() >= expectedAmLimit.getVirtualCores());
+  }
 }