Browse Source

YARN-11801: NPE in FifoCandidatesSelector.selectCandidates when preempting resources for an auto-created queue without child queues (#7607)

Susheel Gupta 4 days ago
parent
commit
7c7adefb65

+ 9 - 8
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java

@@ -22,6 +22,7 @@ import org.apache.hadoop.classification.VisibleForTesting;
 import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableSet;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.AbstractParentQueue;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.AbstractLeafQueue;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
@@ -40,8 +41,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
 
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity
-    .ManagedParentQueue;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacities;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptableQueue;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerPreemptEvent;
@@ -430,12 +429,14 @@ public class ProportionalCapacityPreemptionPolicy
   }
 
   private Set<String> getLeafQueueNames(TempQueuePerPartition q) {
-    // Also exclude ParentQueues, which might be without children
-    if (CollectionUtils.isEmpty(q.children)
-        && !(q.parentQueue instanceof ManagedParentQueue)
-        && (q.parentQueue == null
-        || !q.parentQueue.isEligibleForAutoQueueCreation())) {
-      return ImmutableSet.of(q.queueName);
+    // Only consider this a leaf queue if:
+    // It is a concrete leaf queue (not a childless parent)
+    if (CollectionUtils.isEmpty(q.children)) {
+      CSQueue queue = scheduler.getQueue(q.queueName);
+      if (queue instanceof AbstractLeafQueue) {
+        return ImmutableSet.of(q.queueName);
+      }
+      return Collections.emptySet();
     }
 
     Set<String> leafQueueNames = new HashSet<>();

+ 9 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractParentQueue.java

@@ -552,6 +552,15 @@ public abstract class AbstractParentQueue extends AbstractCSQueue {
     return isDynamicQueue() || queueContext.getConfiguration().
         isAutoQueueCreationV2Enabled(getQueuePathObject());
   }
+  /**
+   * Check whether this queue supports legacy(v1) dynamic child queue creation.
+   * @return true if queue is eligible to create child queues dynamically using
+   * the legacy system, false otherwise
+   */
+  public boolean isEligibleForLegacyAutoQueueCreation() {
+    return isDynamicQueue() || queueContext.getConfiguration().
+        isAutoCreateChildQueueEnabled(getQueuePathObject());
+  }
 
   @Override
   public void reinitialize(CSQueue newlyParsedQueue,

+ 64 - 30
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java

@@ -1083,44 +1083,74 @@ public class TestProportionalCapacityPreemptionPolicy {
   }
 
   @Test
-  public void testLeafQueueNameExtraction() throws Exception {
-    ProportionalCapacityPreemptionPolicy policy =
-        buildPolicy(Q_DATA_FOR_IGNORE);
+  public void testLeafQueueNameExtractionWithFlexibleAQC() throws Exception {
+    ProportionalCapacityPreemptionPolicy policy = buildPolicy(Q_DATA_FOR_IGNORE);
     ParentQueue root = (ParentQueue) mCS.getRootQueue();
+
     root.addDynamicParentQueue("childlessFlexible");
+    ParentQueue dynamicParent = setupDynamicParentQueue("root.dynamicParent", true);
+    extendRootQueueWithMock(root, dynamicParent);
+
+    policy.editSchedule();
+    assertFalse(policy.getLeafQueueNames().contains( "root.dynamicParent"),
+            "root.dynamicLegacyParent" + " should not be a LeafQueue candidate");
+  }
+
+  @Test
+  public void testLeafQueueNameExtractionWithLegacyAQC() throws Exception {
+    ProportionalCapacityPreemptionPolicy policy = buildPolicy(Q_DATA_FOR_IGNORE);
+    ParentQueue root = (ParentQueue) mCS.getRootQueue();
+
+    root.addDynamicParentQueue("childlessLegacy");
+    ParentQueue dynamicParent = setupDynamicParentQueue("root.dynamicLegacyParent", false);
+    extendRootQueueWithMock(root, dynamicParent);
+
+    policy.editSchedule();
+    assertFalse(policy.getLeafQueueNames().contains( "root.dynamicLegacyParent"),
+            "root.dynamicLegacyParent" + " should not be a LeafQueue candidate");
+  }
+
+  private ParentQueue setupDynamicParentQueue(String queuePath, boolean isFlexible) {
+    ParentQueue dynamicParent = mockParentQueue(null, 0, new LinkedList<>());
+    mockQueueFields(dynamicParent, queuePath);
+
+    if (isFlexible) {
+      when(dynamicParent.isEligibleForAutoQueueCreation()).thenReturn(true);
+    } else {
+      when(dynamicParent.isEligibleForLegacyAutoQueueCreation()).thenReturn(true);
+    }
+
+    return dynamicParent;
+  }
+
+  private void extendRootQueueWithMock(ParentQueue root, ParentQueue mockQueue) {
     List<CSQueue> queues = root.getChildQueues();
     ArrayList<CSQueue> extendedQueues = new ArrayList<>();
-    LinkedList<ParentQueue> pqs = new LinkedList<>();
-    ParentQueue dynamicParent = mockParentQueue(
-        null, 0, pqs);
-    when(dynamicParent.getQueuePath()).thenReturn("root.dynamicParent");
-    when(dynamicParent.getQueueCapacities()).thenReturn(
-        new QueueCapacities(false));
-    QueueResourceQuotas dynamicParentQr = new QueueResourceQuotas();
-    dynamicParentQr.setEffectiveMaxResource(Resource.newInstance(1, 1));
-    dynamicParentQr.setEffectiveMinResource(Resources.createResource(1));
-    dynamicParentQr.setEffectiveMaxResource(RMNodeLabelsManager.NO_LABEL,
-        Resource.newInstance(1, 1));
-    dynamicParentQr.setEffectiveMinResource(RMNodeLabelsManager.NO_LABEL,
-        Resources.createResource(1));
-    when(dynamicParent.getQueueResourceQuotas()).thenReturn(dynamicParentQr);
-    when(dynamicParent.getEffectiveCapacity(RMNodeLabelsManager.NO_LABEL))
-        .thenReturn(Resources.createResource(1));
-    when(dynamicParent.getEffectiveMaxCapacity(RMNodeLabelsManager.NO_LABEL))
-        .thenReturn(Resource.newInstance(1, 1));
-    ResourceUsage resUsage = new ResourceUsage();
-    resUsage.setUsed(Resources.createResource(1024));
-    resUsage.setReserved(Resources.createResource(1024));
-    when(dynamicParent.getQueueResourceUsage()).thenReturn(resUsage);
-    when(dynamicParent.isEligibleForAutoQueueCreation()).thenReturn(true);
-    extendedQueues.add(dynamicParent);
+    extendedQueues.add(mockQueue);
     extendedQueues.addAll(queues);
     when(root.getChildQueues()).thenReturn(extendedQueues);
+  }
 
-    policy.editSchedule();
+  private void mockQueueFields(ParentQueue queue, String queuePath) {
+    when(queue.getQueuePath()).thenReturn(queuePath);
+    when(queue.getQueueCapacities()).thenReturn(new QueueCapacities(false));
+
+    QueueResourceQuotas qrq = new QueueResourceQuotas();
+    qrq.setEffectiveMaxResource(Resource.newInstance(1, 1));
+    qrq.setEffectiveMinResource(Resources.createResource(1));
+    qrq.setEffectiveMaxResource(RMNodeLabelsManager.NO_LABEL, Resource.newInstance(1, 1));
+    qrq.setEffectiveMinResource(RMNodeLabelsManager.NO_LABEL, Resources.createResource(1));
 
-    assertFalse(policy.getLeafQueueNames().contains("root.dynamicParent"),
-        "dynamicParent should not be a LeafQueue candidate");
+    when(queue.getQueueResourceQuotas()).thenReturn(qrq);
+    when(queue.getEffectiveCapacity(RMNodeLabelsManager.NO_LABEL))
+        .thenReturn(Resources.createResource(1));
+    when(queue.getEffectiveMaxCapacity(RMNodeLabelsManager.NO_LABEL))
+        .thenReturn(Resource.newInstance(1, 1));
+
+    ResourceUsage usage = new ResourceUsage();
+    usage.setUsed(Resources.createResource(1024));
+    usage.setReserved(Resources.createResource(1024));
+    when(queue.getQueueResourceUsage()).thenReturn(usage);
   }
 
   static class IsPreemptionRequestFor
@@ -1369,6 +1399,10 @@ public class TestProportionalCapacityPreemptionPolicy {
       Resource[] used, Resource[] pending, Resource[] reserved, int[] apps,
       Resource[] gran) {
     LeafQueue lq = mock(LeafQueue.class);
+
+    String queuePath = p.getQueuePath() + ".queue" + (char)('A' + i - 1);
+    when(mCS.getQueue(queuePath)).thenReturn(lq);
+
     ResourceCalculator rc = mCS.getResourceCalculator();
     List<ApplicationAttemptId> appAttemptIdList = 
         new ArrayList<ApplicationAttemptId>();