Selaa lähdekoodia

YARN-11798. Precheck request separately to avoid redundant node checks and optimize performance for global scheduler. (#7516) Contributed by Tao Yang.

* YARN-11798. Precheck request separately to avoid redundant node checks and optimize performance for global scheduler.

* Add node for recording scheduler activities in RegularContainerAllocator#preCheckRequest to fix UT.

Signed-off-by: Shilun Fan <slfan1989@apache.org>
Tao Yang 2 kuukautta sitten
vanhempi
commit
a314a1d714

+ 55 - 35
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java

@@ -25,6 +25,7 @@ import java.util.Optional;
 
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityLevel;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.DiagnosticsCollector;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
@@ -100,15 +101,18 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
    * We will consider stuffs like exclusivity, pending resource, node partition,
    * headroom, etc.
    */
-  private ContainerAllocation preCheckForNodeCandidateSet(FiCaSchedulerNode node,
+  private ContainerAllocation preCheckRequest(
+      CandidateNodeSet<FiCaSchedulerNode> candidates,
       SchedulingMode schedulingMode, ResourceLimits resourceLimits,
       SchedulerRequestKey schedulerKey) {
     PendingAsk offswitchPendingAsk = application.getPendingAsk(schedulerKey,
         ResourceRequest.ANY);
+    SchedulerNode recordNode =
+        CandidateNodeSetUtils.getSingleNode(candidates);
 
     if (offswitchPendingAsk.getCount() <= 0) {
       ActivitiesLogger.APP.recordSkippedAppActivityWithoutAllocation(
-          activitiesManager, node, application, schedulerKey,
+          activitiesManager, recordNode, application, schedulerKey,
           ActivityDiagnosticConstant.REQUEST_DO_NOT_NEED_RESOURCE,
           ActivityLevel.REQUEST);
       return ContainerAllocation.PRIORITY_SKIPPED;
@@ -120,7 +124,7 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
     // Do we need containers at this 'priority'?
     if (application.getOutstandingAsksCount(schedulerKey) <= 0) {
       ActivitiesLogger.APP.recordSkippedAppActivityWithoutAllocation(
-          activitiesManager, node, application, schedulerKey,
+          activitiesManager, recordNode, application, schedulerKey,
           ActivityDiagnosticConstant.REQUEST_DO_NOT_NEED_RESOURCE,
           ActivityLevel.REQUEST);
       return ContainerAllocation.PRIORITY_SKIPPED;
@@ -130,12 +134,12 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
       if (application.isWaitingForAMContainer() && !rmContext.getYarnConfiguration()
           .getBoolean(AM_ALLOW_NON_EXCLUSIVE_ALLOCATION, false)) {
         LOG.debug("Skip allocating AM container to app_attempt={},"
-            + " don't allow to allocate AM container in non-exclusive mode",
+                + " don't allow to allocate AM container in non-exclusive mode",
             application.getApplicationAttemptId());
         application.updateAppSkipNodeDiagnostics(
             "Skipping assigning to Node in Ignore Exclusivity mode. ");
         ActivitiesLogger.APP.recordSkippedAppActivityWithoutAllocation(
-            activitiesManager, node, application, schedulerKey,
+            activitiesManager, recordNode, application, schedulerKey,
             ActivityDiagnosticConstant.
                 REQUEST_SKIPPED_IN_IGNORE_EXCLUSIVITY_MODE,
             ActivityLevel.REQUEST);
@@ -143,37 +147,22 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
       }
     }
 
-    // Is the nodePartition of pending request matches the node's partition
-    // If not match, jump to next priority.
-    Optional<DiagnosticsCollector> dcOpt = activitiesManager == null ?
-        Optional.empty() :
-        activitiesManager.getOptionalDiagnosticsCollector();
-    if (!appInfo.precheckNode(schedulerKey, node, schedulingMode, dcOpt)) {
-      ActivitiesLogger.APP.recordSkippedAppActivityWithoutAllocation(
-          activitiesManager, node, application, schedulerKey,
-          ActivityDiagnosticConstant.
-              NODE_DO_NOT_MATCH_PARTITION_OR_PLACEMENT_CONSTRAINTS
-              + ActivitiesManager.getDiagnostics(dcOpt),
-          ActivityLevel.NODE);
-      return ContainerAllocation.PRIORITY_SKIPPED;
-    }
-
     if (!application.getCSLeafQueue().isReservationsContinueLooking()) {
       if (!shouldAllocOrReserveNewContainer(schedulerKey, required)) {
         LOG.debug("doesn't need containers based on reservation algo!");
         ActivitiesLogger.APP.recordSkippedAppActivityWithoutAllocation(
-            activitiesManager, node, application, schedulerKey,
+            activitiesManager, recordNode, application, schedulerKey,
             ActivityDiagnosticConstant.REQUEST_SKIPPED_BECAUSE_OF_RESERVATION,
             ActivityLevel.REQUEST);
         return ContainerAllocation.PRIORITY_SKIPPED;
       }
     }
 
-    if (!checkHeadroom(resourceLimits, required, node.getPartition())) {
+    if (!checkHeadroom(resourceLimits, required, candidates.getPartition())) {
       LOG.debug("cannot allocate required resource={} because of headroom",
           required);
       ActivitiesLogger.APP.recordAppActivityWithoutAllocation(
-          activitiesManager, node, application, schedulerKey,
+          activitiesManager, recordNode, application, schedulerKey,
           ActivityDiagnosticConstant.QUEUE_DO_NOT_HAVE_ENOUGH_HEADROOM,
           ActivityState.REJECTED,
           ActivityLevel.REQUEST);
@@ -190,7 +179,7 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
       // This is possible when #pending resource decreased by a different
       // thread.
       ActivitiesLogger.APP.recordSkippedAppActivityWithoutAllocation(
-          activitiesManager, node, application, schedulerKey,
+          activitiesManager, recordNode, application, schedulerKey,
           ActivityDiagnosticConstant.REQUEST_SKIPPED_BECAUSE_NULL_ANY_REQUEST,
           ActivityLevel.REQUEST);
       return ContainerAllocation.PRIORITY_SKIPPED;
@@ -221,13 +210,38 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
               + rmContext.getScheduler().getNumClusterNodes());
         }
         ActivitiesLogger.APP.recordSkippedAppActivityWithoutAllocation(
-            activitiesManager, node, application, schedulerKey,
+            activitiesManager, recordNode, application, schedulerKey,
             ActivityDiagnosticConstant.
                 REQUEST_SKIPPED_BECAUSE_NON_PARTITIONED_PARTITION_FIRST,
             ActivityLevel.REQUEST);
         return ContainerAllocation.APP_SKIPPED;
       }
     }
+    return null;
+  }
+
+  /*
+   * Pre-check if we can allocate a pending resource request
+   * (given schedulerKey) to a given node.
+   * We will consider stuffs like placement-constraints, etc.
+   */
+  private ContainerAllocation preCheckForNode(FiCaSchedulerNode node,
+      SchedulingMode schedulingMode, SchedulerRequestKey schedulerKey) {
+
+    // Is the nodePartition of pending request matches the node's partition
+    // If not match, jump to next priority.
+    Optional<DiagnosticsCollector> dcOpt = activitiesManager == null ?
+        Optional.empty() :
+        activitiesManager.getOptionalDiagnosticsCollector();
+    if (!appInfo.precheckNode(schedulerKey, node, schedulingMode, dcOpt)) {
+      ActivitiesLogger.APP.recordSkippedAppActivityWithoutAllocation(
+          activitiesManager, node, application, schedulerKey,
+          ActivityDiagnosticConstant.
+              NODE_DO_NOT_MATCH_PARTITION_OR_PLACEMENT_CONSTRAINTS
+              + ActivitiesManager.getDiagnostics(dcOpt),
+          ActivityLevel.NODE);
+      return ContainerAllocation.PRIORITY_SKIPPED;
+    }
 
     return null;
   }
@@ -849,6 +863,21 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
           AllocationState.PRIORITY_SKIPPED);
     }
 
+    // pre-check request
+    if (reservedContainer == null) {
+      result = preCheckRequest(candidates,
+          schedulingMode, resourceLimits, schedulerKey);
+      if (null != result) {
+        return result;
+      }
+    } else {
+      // pre-check when allocating reserved container
+      if (application.getOutstandingAsksCount(schedulerKey) == 0) {
+        return new ContainerAllocation(reservedContainer, null,
+            AllocationState.QUEUE_SKIPPED);
+      }
+    }
+
     result = ContainerAllocation.PRIORITY_SKIPPED;
 
     Iterator<FiCaSchedulerNode> iter = schedulingPS.getPreferredNodeIterator(
@@ -872,19 +901,10 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
       }
 
       if (reservedContainer == null) {
-        result = preCheckForNodeCandidateSet(node,
-            schedulingMode, resourceLimits, schedulerKey);
+        result = preCheckForNode(node, schedulingMode, schedulerKey);
         if (null != result) {
           continue;
         }
-      } else {
-        // pre-check when allocating reserved container
-        if (application.getOutstandingAsksCount(schedulerKey) == 0) {
-          // Release
-          result = new ContainerAllocation(reservedContainer, null,
-              AllocationState.QUEUE_SKIPPED);
-          continue;
-        }
       }
 
       result = tryAllocateOnNode(clusterResource, node, schedulingMode,

+ 110 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerMultiNodes.java

@@ -23,9 +23,12 @@ import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.C
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.api.Assertions.fail;
+import static org.mockito.Mockito.when;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
@@ -36,11 +39,18 @@ import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.hadoop.thirdparty.com.google.common.collect.Iterators;
 
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ClusterNodeTracker;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.CandidateNodeSet;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SimpleCandidateNodeSet;
+import org.mockito.Mockito;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.yarn.api.records.NodeId;
@@ -572,6 +582,106 @@ public class TestCapacitySchedulerMultiNodes {
     rm1.close();
   }
 
+  // In a scheduling scenario with 2000 candidate nodes and an unsatisfied
+  // request that reaches the headroom,
+  // currently the global scheduler checks the request for each node repeatedly,
+  // causing a scheduling cycle to exceed 200ms.
+  // With the improvement of YARN-11798, the global scheduler checks the request
+  // only once before evaluating all nodes, reducing the scheduling cycle to
+  // less than 2ms.
+  @Test
+  @Timeout(value = 30)
+  public void testCheckRequestOnceForUnsatisfiedRequest() throws Exception {
+    QueuePath defaultQueuePath =
+        new QueuePath(CapacitySchedulerConfiguration.ROOT, "default");
+    String resourceLimit = "[vcores=3,memory=3096]";
+    conf.setCapacity(defaultQueuePath, resourceLimit);
+    conf.setMaximumCapacityByLabel(defaultQueuePath, "", resourceLimit);
+    conf.setInt(YarnConfiguration.SCHEDULER_SKIP_NODE_MULTIPLIER, 1000);
+    MockRM rm = new MockRM(conf);
+    rm.start();
+    MockNM nm1 = rm.registerNode("test:1234", 10 * GB);
+    ResourceScheduler scheduler = rm.getRMContext().getScheduler();
+    waitforNMRegistered(scheduler, 1, 5);
+
+    MockRMAppSubmissionData data1 =
+        MockRMAppSubmissionData.Builder.createWithMemory(2048, rm)
+            .withAppName("app-1")
+            .withUser("user1")
+            .withAcls(null)
+            .withQueue("default")
+            .withUnmanagedAM(false)
+            .build();
+    RMApp app1 = MockRMAppSubmitter.submit(rm, data1);
+    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm1);
+    SchedulerNodeReport reportNm1 =
+        rm.getResourceScheduler().getNodeReport(nm1.getNodeId());
+
+    // check node report
+    assertEquals(2 * GB, reportNm1.getUsedResource().getMemorySize());
+    assertEquals(8 * GB,
+        reportNm1.getAvailableResource().getMemorySize());
+
+    // mock node tracker with 2000 nodes
+    // to simulate the scenario where there are many nodes in the cluster
+    List<FiCaSchedulerNode> mockNodes = new ArrayList<>();
+    long ss = System.currentTimeMillis();
+    for (int i = 0; i < 2000; i++) {
+      FiCaSchedulerNode node =
+          TestUtils.getMockNode("host" + i + ":1234", "", 0, 10 * GB, 10);
+      mockNodes.add(node);
+    }
+    ClusterNodeTracker<FiCaSchedulerNode> mockNodeTracker =
+        new ClusterNodeTracker<FiCaSchedulerNode>() {
+      @Override
+      public List<FiCaSchedulerNode> getNodesPerPartition(String partition) {
+        return mockNodes;
+      }
+    };
+
+    // replace the scheduler with a spy scheduler with mocked node-tracker
+    CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
+    final CapacityScheduler spyCs = Mockito.spy(cs);
+    when(spyCs.getNodeTracker()).thenReturn(mockNodeTracker);
+    RMContext rmContext = rm.getRMContext();
+    ((RMContextImpl) rmContext).setScheduler(spyCs);
+
+    MultiNodeSortingManager<SchedulerNode> mns =
+        rmContext.getMultiNodeSortingManager();
+    MultiNodeSorter<SchedulerNode> sorter =
+        mns.getMultiNodePolicy(POLICY_CLASS_NAME);
+    sorter.reSortClusterNodes();
+
+    // verify that the number of nodes is correct
+    Set<SchedulerNode> nodes =
+        sorter.getMultiNodeLookupPolicy().getNodesPerPartition("");
+    assertEquals(mockNodes.size(), nodes.size());
+
+    // create an unsatisfied request which will reach the headroom
+    am1.allocate("*", 2 * GB, 10, new ArrayList<>());
+
+    // verify that when headroom is reached for an unsatisfied request,
+    // scheduler should only check the request once before checking all nodes.
+    CandidateNodeSet<FiCaSchedulerNode> candidates =
+        new SimpleCandidateNodeSet<>(Collections.emptyMap(), "");
+    int numSchedulingCycles = 10;
+    long startTime = System.currentTimeMillis();
+    for (int i = 0; i < numSchedulingCycles; i++) {
+      spyCs.allocateContainersToNode(candidates, false);
+    }
+    long avgElapsedMs =
+        (System.currentTimeMillis() - startTime) / numSchedulingCycles;
+    LOG.info("Average elapsed time for a scheduling cycle: {} ms",
+        avgElapsedMs);
+    // verify that the scheduling cycle is less than 5ms,
+    // ideally the latency should be less than 2ms.
+    assertTrue(avgElapsedMs < 5,
+        String.format("%d ms elapsed in average for a scheduling cycle, " +
+            "expected to be less than 5ms.", avgElapsedMs));
+
+    rm.stop();
+  }
+
   private static void moveReservation(CapacityScheduler cs,
       MockRM rm1, MockNM nm1, MockNM nm2, MockAM am1) {
     RMNode sourceNode = rm1.getRMContext().getRMNodes().get(nm2.getNodeId());