Browse Source

YARN-11573. Add config option to make container allocation prefer nodes without reserved containers (#6098)

Szilard Nemeth 1 year ago
parent
commit
13c5825c00

+ 1 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivityDiagnosticConstant.java

@@ -107,4 +107,5 @@ public class ActivityDiagnosticConstant {
   public final static String
       NODE_CAN_NOT_FIND_CONTAINER_TO_BE_UNRESERVED_WHEN_NEEDED =
       "Node can't find a container to be unreserved when needed";
+  public static final String NODE_HAS_BEEN_RESERVED = "Node has been reserved";
 }

+ 11 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java

@@ -154,6 +154,12 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
   @Private
   public static final boolean DEFAULT_RESERVE_CONT_LOOK_ALL_NODES = true;
 
+  public static final String SKIP_ALLOCATE_ON_NODES_WITH_RESERVED_CONTAINERS = PREFIX
+      + "skip-allocate-on-nodes-with-reserved-containers";
+
+  @Private
+  public static final boolean DEFAULT_SKIP_ALLOCATE_ON_NODES_WITH_RESERVED_CONTAINERS = false;
+
   @Private
   public static final String MAXIMUM_ALLOCATION = "maximum-allocation";
 
@@ -938,6 +944,11 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
         DEFAULT_RESERVE_CONT_LOOK_ALL_NODES);
   }
 
+  public boolean getSkipAllocateOnNodesWithReservedContainer() {
+    return getBoolean(SKIP_ALLOCATE_ON_NODES_WITH_RESERVED_CONTAINERS,
+        DEFAULT_SKIP_ALLOCATE_ON_NODES_WITH_RESERVED_CONTAINERS);
+  }
+
   private static String getAclKey(QueueACL acl) {
     return "acl_" + StringUtils.toLowerCase(acl.toString());
   }

+ 30 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java

@@ -24,8 +24,11 @@ import java.util.List;
 import java.util.Optional;
 
 import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityLevel;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.DiagnosticsCollector;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.yarn.api.records.Container;
@@ -850,9 +853,23 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
 
     Iterator<FiCaSchedulerNode> iter = schedulingPS.getPreferredNodeIterator(
         candidates);
+
     while (iter.hasNext()) {
       FiCaSchedulerNode node = iter.next();
 
+      // Do not schedule if there are any reservations to fulfill on the node
+      if (iter.hasNext() &&
+          node.getReservedContainer() != null &&
+          isSkipAllocateOnNodesWithReservedContainer()) {
+        LOG.debug("Skipping scheduling on node {} since it has already been"
+                + " reserved by {}", node.getNodeID(),
+            node.getReservedContainer().getContainerId());
+        ActivitiesLogger.APP.recordSkippedAppActivityWithoutAllocation(
+            activitiesManager, node, application, schedulerKey,
+            ActivityDiagnosticConstant.NODE_HAS_BEEN_RESERVED, ActivityLevel.NODE);
+        continue;
+      }
+
       if (reservedContainer == null) {
         result = preCheckForNodeCandidateSet(node,
             schedulingMode, resourceLimits, schedulerKey);
@@ -894,7 +911,19 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
 
     return result;
   }
-  
+
+  private boolean isSkipAllocateOnNodesWithReservedContainer() {
+    ResourceScheduler scheduler = rmContext.getScheduler();
+    boolean skipAllocateOnNodesWithReservedContainer = false;
+    if (scheduler instanceof CapacityScheduler) {
+      CapacityScheduler cs = (CapacityScheduler) scheduler;
+      CapacitySchedulerConfiguration csConf = cs.getConfiguration();
+      skipAllocateOnNodesWithReservedContainer =
+          csConf.getSkipAllocateOnNodesWithReservedContainer();
+    }
+    return skipAllocateOnNodesWithReservedContainer;
+  }
+
   @Override
   public CSAssignment assignContainers(Resource clusterResource,
       CandidateNodeSet<FiCaSchedulerNode> candidates,

+ 110 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerMultiNodes.java

@@ -25,12 +25,16 @@ import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.hadoop.thirdparty.com.google.common.collect.Iterators;
 
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
 import org.slf4j.Logger;
@@ -478,4 +482,110 @@ public class TestCapacitySchedulerMultiNodes {
     rm.stop();
   }
 
+  @Test(timeout=30000)
+  public void testSkipAllocationOnNodeReservedByAnotherApp() throws Exception {
+    CapacitySchedulerConfiguration newConf =
+        new CapacitySchedulerConfiguration(conf);
+    newConf.set(YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_HANDLER,
+        YarnConfiguration.SCHEDULER_RM_PLACEMENT_CONSTRAINTS_HANDLER);
+    newConf.setInt(CapacitySchedulerConfiguration.MULTI_NODE_SORTING_POLICY_NAME
+        + ".resource-based.sorting-interval.ms", 0);
+    newConf.setMaximumApplicationMasterResourcePerQueuePercent("root.default", 1.0f);
+    newConf.set(CapacitySchedulerConfiguration.SKIP_ALLOCATE_ON_NODES_WITH_RESERVED_CONTAINERS,
+        "true");
+    MockRM rm1 = new MockRM(newConf);
+
+    rm1.start();
+    MockNM nm1 = rm1.registerNode("127.0.0.1:1234", 8 * GB);
+    MockNM nm2 = rm1.registerNode("127.0.0.2:1235", 8 * GB);
+
+    // launch an app to queue, AM container should be launched in nm1
+    RMApp app1 = MockRMAppSubmitter.submit(rm1, MockRMAppSubmissionData.Builder
+        .createWithMemory(5 * GB, rm1)
+        .withAppName("app")
+        .withUser("user")
+        .withQueue("default")
+        .build());
+    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
+
+    // launch another app to queue, AM container should be launched in nm2
+    RMApp app2 = MockRMAppSubmitter.submit(rm1, MockRMAppSubmissionData.Builder
+        .createWithMemory(5 * GB, rm1)
+        .withAppName("app")
+        .withUser("user")
+        .withQueue("default")
+        .build());
+    MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm2);
+
+    CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
+    RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
+    FiCaSchedulerApp schedulerApp1 =
+        cs.getApplicationAttempt(am1.getApplicationAttemptId());
+    FiCaSchedulerApp schedulerApp2 =
+        cs.getApplicationAttempt(am2.getApplicationAttemptId());
+
+    // Ask a container with 4 GB memory size for app1,
+    am1.allocate("*", 4 * GB, 1, new ArrayList<>());
+    cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
+
+
+    // Check containers of app1 and app2.
+    Set<RMNode> reservedContainers = checkReservedContainers(cs,
+        rm1.getRMContext().getRMNodes(), 1);
+    Assert.assertEquals(1, reservedContainers.size());
+    RMNode nodeWithReservedContainer = reservedContainers.iterator().next();
+    LOG.debug("Reserved container on: {}", nodeWithReservedContainer);
+
+    //Move reservation to nm1 for easier testing
+    if (nodeWithReservedContainer.getNodeID().getHost().startsWith("127.0.0.2")) {
+      moveReservation(cs, rm1, nm1, nm2, am1);
+    }
+    Assert.assertNotNull(cs.getNode(nm1.getNodeId()).getReservedContainer());
+    Assert.assertNull(cs.getNode(nm2.getNodeId()).getReservedContainer());
+
+    Assert.assertEquals(1, schedulerApp1.getLiveContainers().size());
+    Assert.assertEquals(1, schedulerApp2.getLiveContainers().size());
+    Assert.assertEquals(1, schedulerApp1.getReservedContainers().size());
+
+    //Make sure to have available headroom on the child queue,
+    // see: RegularContainerAllocator#checkHeadroom,
+    //that can make RegularContainerAllocator.preCheckForNodeCandidateSet to return
+    // ContainerAllocation.QUEUE_SKIPPED
+    MockNM nm3 = rm1.registerNode("127.0.0.3:1235", 3 * GB);
+
+    //Allocate a container for app2, we expect this to be allocated on nm2 as
+    // nm1 has a reservation for another app
+    am2.allocate("*", 4 * GB, 1, new ArrayList<>());
+    cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
+    Assert.assertNotNull(cs.getNode(nm1.getNodeId()).getReservedContainer());
+    Assert.assertNotNull(cs.getNode(nm2.getNodeId()).getReservedContainer());
+
+    rm1.close();
+  }
+
+  private static void moveReservation(CapacityScheduler cs,
+      MockRM rm1, MockNM nm1, MockNM nm2, MockAM am1) {
+    RMNode sourceNode = rm1.getRMContext().getRMNodes().get(nm2.getNodeId());
+    RMNode targetNode = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
+    SchedulerApplicationAttempt firstSchedulerAppAttempt =
+        cs.getApplicationAttempt(am1.getApplicationAttemptId());
+    FiCaSchedulerApp app = (FiCaSchedulerApp)firstSchedulerAppAttempt;
+    RMContainer reservedContainer = cs.getNode(sourceNode.getNodeID()).getReservedContainer();
+    LOG.debug("Moving reservation");
+    app.moveReservation(reservedContainer,
+        cs.getNode(sourceNode.getNodeID()), cs.getNode(targetNode.getNodeID()));
+  }
+
+  private static Set<RMNode> checkReservedContainers(CapacityScheduler cs,
+      ConcurrentMap<NodeId, RMNode> rmNodes, int expectedNumberOfContainers) {
+    Set<RMNode> result = new HashSet<>();
+    for (Map.Entry<NodeId, RMNode> entry : rmNodes.entrySet()) {
+      if (cs.getNode(entry.getKey()).getReservedContainer() != null) {
+        result.add(entry.getValue());
+      }
+    }
+
+    Assert.assertEquals(expectedNumberOfContainers, result.size());
+    return result;
+  }
 }