Bläddra i källkod

YARN-6750. Add a configuration to cap how much a NM can be overallocated. Contributed by Haibo Chen.

Miklos Szegedi 7 år sedan
förälder
incheckning
6d028d77ca

+ 5 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java

@@ -398,6 +398,11 @@ public class YarnConfiguration extends Configuration {
   /** ACL used in case none is found. Allows nothing. */
   public static final String DEFAULT_YARN_APP_ACL = " ";
 
+  /** The global max overallocation per node in terms of their capacity. */
+  public static final String PER_NODE_MAX_OVERALLOCATION_RATIO =
+      RM_PREFIX + "overallocation.per-node-max-ratio";
+  public static final float DEFAULT_PER_NODE_MAX_OVERALLOCATION_RATIO = 4.0f;
+
   /** Setting that controls whether opportunistic container allocation
    *  is enabled or not. */
   @Unstable

+ 10 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml

@@ -3340,6 +3340,16 @@
     <value>10</value>
   </property>
 
+  <property>
+    <description>
+    The maximum amount of resources, specified as a ratio to node capacity,
+    that can be allocated to opportunistic containers on any given node in
+    the cluster.
+    </description>
+    <name>yarn.resourcemanager.overallocation.per-node-max-ratio</name>
+    <value>4.0</value>
+  </property>
+
   <property>
     <description>
     Frequency for computing least loaded NMs.

+ 4 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java

@@ -119,6 +119,7 @@ public abstract class AbstractYarnScheduler
       new ClusterNodeTracker<>();
 
   protected Resource minimumAllocation;
+  protected float maxOverAllocationRatioPerNode;
 
   protected volatile RMContext rmContext;
 
@@ -201,6 +202,9 @@ public abstract class AbstractYarnScheduler
     nodeTracker.setConfiguredMaxAllocationWaitTime(
         configuredMaximumAllocationWaitTime);
     maxClusterLevelAppPriority = getMaxPriorityFromConf(conf);
+    maxOverAllocationRatioPerNode = conf.getFloat(
+        YarnConfiguration.PER_NODE_MAX_OVERALLOCATION_RATIO,
+        YarnConfiguration.DEFAULT_PER_NODE_MAX_OVERALLOCATION_RATIO);
     createReleaseCache();
     autoUpdateContainers =
         conf.getBoolean(YarnConfiguration.RM_AUTO_UPDATE_CONTAINERS,

+ 36 - 6
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java

@@ -94,6 +94,10 @@ public abstract class SchedulerNode {
   protected Resource resourceAllocatedPendingLaunch =
       Resource.newInstance(0, 0);
 
+  // The max amount of resources that can be allocated to opportunistic
+  // containers on the node, specified as a ratio to its capacity
+  private final float maxOverAllocationRatio;
+
   private volatile Set<String> labels = null;
 
   private volatile Set<NodeAttribute> nodeAttributes = null;
@@ -102,7 +106,7 @@ public abstract class SchedulerNode {
   private volatile long lastHeartbeatMonotonicTime;
 
   public SchedulerNode(RMNode node, boolean usePortForNodeName,
-      Set<String> labels) {
+      Set<String> labels, float maxOverAllocationRatio) {
     this.rmNode = node;
     this.rmContext = node.getRMContext();
     this.unallocatedResource = Resources.clone(node.getTotalCapability());
@@ -114,10 +118,24 @@ public abstract class SchedulerNode {
     }
     this.labels = ImmutableSet.copyOf(labels);
     this.lastHeartbeatMonotonicTime = Time.monotonicNow();
+    this.maxOverAllocationRatio = maxOverAllocationRatio;
+  }
+
+  public SchedulerNode(RMNode node, boolean usePortForNodeName,
+      Set<String> labels) {
+    this(node, usePortForNodeName, labels,
+        YarnConfiguration.DEFAULT_PER_NODE_MAX_OVERALLOCATION_RATIO);
+  }
+
+  public SchedulerNode(RMNode node, boolean usePortForNodeName,
+      float maxOverAllocationRatio) {
+    this(node, usePortForNodeName, CommonNodeLabelsManager.EMPTY_STRING_SET,
+        maxOverAllocationRatio);
   }
 
   public SchedulerNode(RMNode node, boolean usePortForNodeName) {
-    this(node, usePortForNodeName, CommonNodeLabelsManager.EMPTY_STRING_SET);
+    this(node, usePortForNodeName, CommonNodeLabelsManager.EMPTY_STRING_SET,
+        YarnConfiguration.DEFAULT_PER_NODE_MAX_OVERALLOCATION_RATIO);
   }
 
   public RMNode getRMNode() {
@@ -671,9 +689,11 @@ public abstract class SchedulerNode {
 
   /**
    * Get the amount of resources that can be allocated to opportunistic
-   * containers in the case of overallocation. It is calculated as
+   * containers in the case of overallocation, calculated as
    * node capacity - (node utilization + resources of allocated-yet-not-started
-   * containers).
+   * containers), subject to the maximum amount of resources that can be
+   * allocated to opportunistic containers on the node specified as a ratio to
+   * its capacity.
    * @return the amount of resources that are available to be allocated to
    *         opportunistic containers
    */
@@ -706,11 +726,21 @@ public abstract class SchedulerNode {
     Resource resourceAllowedForOpportunisticContainers =
         Resources.createResource(allowedMemory, allowedCpu);
 
-    // TODO cap the resources allocated to OPPORTUNISTIC containers on a node
-    // in terms of its capacity. i.e. return min(max_ratio * capacity, allowed)
+    // cap the total amount of resources allocated to OPPORTUNISTIC containers
+    Resource maxOverallocation = getMaxOverallocationAllowed();
+    Resources.subtractFrom(maxOverallocation, allocatedResourceOpportunistic);
+    resourceAllowedForOpportunisticContainers = Resources.componentwiseMin(
+        maxOverallocation, resourceAllowedForOpportunisticContainers);
+
     return resourceAllowedForOpportunisticContainers;
   }
 
+  private Resource getMaxOverallocationAllowed() {
+    long maxMemory = (long) (capacity.getMemorySize() * maxOverAllocationRatio);
+    int maxVcore = (int) (capacity.getVirtualCores() * maxOverAllocationRatio);
+    return Resource.newInstance(maxMemory, maxVcore);
+  }
+
   private static class ContainerInfo {
     private final RMContainer container;
     private boolean launchedOnNode;

+ 5 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java

@@ -66,10 +66,15 @@ public class FSSchedulerNode extends SchedulerNode {
   // slated for preemption
   private Resource totalResourcesPreempted = Resource.newInstance(0, 0);
 
+  @VisibleForTesting
   public FSSchedulerNode(RMNode node, boolean usePortForNodeName) {
     super(node, usePortForNodeName);
   }
 
+  public FSSchedulerNode(RMNode node, boolean usePortForNodeName,
+      float maxOverallocationRatio) {
+    super(node, usePortForNodeName, maxOverallocationRatio);
+  }
   /**
    * Total amount of reserved resources including reservations and preempted
    * containers.

+ 1 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java

@@ -774,7 +774,7 @@ public class FairScheduler extends
     writeLock.lock();
     try {
       FSSchedulerNode schedulerNode = new FSSchedulerNode(node,
-          usePortForNodeName);
+          usePortForNodeName, maxOverAllocationRatioPerNode);
       nodeTracker.addNode(schedulerNode);
 
       triggerUpdate();

+ 107 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java

@@ -3143,6 +3143,113 @@ public class TestFairScheduler extends FairSchedulerTestBase {
     }
   }
 
+  /**
+   * Test that max overallocation per node is enforced by Fair Scheduler.
+   * @throws Exception
+   */
+  @Test
+  public void testMaxOverallocationPerNode() throws Exception {
+    conf.setBoolean(YarnConfiguration.RM_SCHEDULER_OVERSUBSCRIPTION_ENABLED,
+        true);
+    // disable resource request normalization in fair scheduler
+    int memoryAllocationIncrement = conf.getInt(
+        FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB,
+        FairSchedulerConfiguration.
+            DEFAULT_RM_SCHEDULER_INCREMENT_ALLOCATION_MB);
+    conf.setInt(
+        FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB, 1);
+    int memoryAllocationMinimum = conf.getInt(
+        YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
+        YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB);
+    conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 1);
+    float maxOverallocationRatio = conf.getFloat(
+        YarnConfiguration.PER_NODE_MAX_OVERALLOCATION_RATIO,
+        YarnConfiguration.DEFAULT_PER_NODE_MAX_OVERALLOCATION_RATIO);
+    conf.setFloat(YarnConfiguration.PER_NODE_MAX_OVERALLOCATION_RATIO, 1.5f);
+
+    try {
+      scheduler.init(conf);
+      scheduler.start();
+      scheduler.reinitialize(conf, resourceManager.getRMContext());
+
+      // Add a node with 1G of memory and 1 vcores and an overallocation
+      // threshold of 1.0f and 1.0f for memory and cpu respectively
+      OverAllocationInfo overAllocationInfo = OverAllocationInfo.newInstance(
+          ResourceThresholds.newInstance(1f, 1f));
+      MockNodes.MockRMNodeImpl node = MockNodes.newNodeInfo(1,
+          Resources.createResource(1024, 1), overAllocationInfo);
+      scheduler.handle(new NodeAddedSchedulerEvent(node));
+
+      // create a scheduling request that takes up the whole node
+      ApplicationAttemptId appAttempt1 =
+          createSchedulingRequest(1024, "queue1", "user1", 1);
+      scheduler.handle(new NodeUpdateSchedulerEvent(node));
+      assertEquals(1024, scheduler.getQueueManager().getQueue("queue1").
+          getGuaranteedResourceUsage().getMemorySize());
+      List<Container> allocatedContainers1 =
+          scheduler.getSchedulerApp(appAttempt1).pullNewlyAllocatedContainers();
+      assertTrue(allocatedContainers1.size() == 1);
+      assertEquals("unexpected container execution type",
+          ExecutionType.GUARANTEED,
+          allocatedContainers1.get(0).getExecutionType());
+
+      // node utilization is zero after the container runs
+      ContainerStatus containerStatus1 = ContainerStatus.newInstance(
+          allocatedContainers1.get(0).getId(), ContainerState.RUNNING, "",
+          ContainerExitStatus.SUCCESS);
+      node.updateContainersAndNodeUtilization(
+          new UpdatedContainerInfo(Collections.singletonList(containerStatus1),
+              Collections.emptyList()),
+          ResourceUtilization.newInstance(0, 0, 0.0f));
+
+      // create a scheduling request that should get allocated an OPPORTUNISTIC
+      // container because the node utilization is zero
+      ApplicationAttemptId appAttempt2 =
+          createSchedulingRequest(1024, "queue2", "user1", 1);
+      scheduler.handle(new NodeUpdateSchedulerEvent(node));
+      List<Container> allocatedContainers2 =
+          scheduler.getSchedulerApp(appAttempt2).pullNewlyAllocatedContainers();
+      assertTrue(allocatedContainers2.size() == 1);
+      assertEquals("unexpected container execution type",
+          ExecutionType.OPPORTUNISTIC,
+          allocatedContainers2.get(0).getExecutionType());
+      assertEquals(1024, scheduler.getQueueManager().getQueue("queue2").
+          getOpportunisticResourceUsage().getMemorySize());
+
+      // node utilization is still zero after the container runs
+      ContainerStatus containerStatus2 = ContainerStatus.newInstance(
+          allocatedContainers2.get(0).getId(), ContainerState.RUNNING, "",
+          ContainerExitStatus.SUCCESS);
+      node.updateContainersAndNodeUtilization(
+          new UpdatedContainerInfo(Collections.singletonList(containerStatus2),
+              Collections.emptyList()),
+          ResourceUtilization.newInstance(0, 0, 0.0f));
+
+      // create another scheduling request that should not get any allocation
+      // because of the max overallocation on the node will be exceeded.
+      ApplicationAttemptId appAttempt3 =
+          createSchedulingRequest(1024, "queue3", "user1", 1);
+      scheduler.handle(new NodeUpdateSchedulerEvent(node));
+      assertEquals(0, scheduler.getQueueManager().getQueue("queue3").
+          getOpportunisticResourceUsage().getMemorySize());
+      List<Container> allocatedContainers3 =
+          scheduler.getSchedulerApp(appAttempt3).pullNewlyAllocatedContainers();
+      assertTrue(allocatedContainers3.size() == 0);
+      assertEquals(0, scheduler.getQueueManager().getQueue("queue3").
+          getOpportunisticResourceUsage().getMemorySize());
+    } finally {
+      conf.setBoolean(YarnConfiguration.RM_SCHEDULER_OVERSUBSCRIPTION_ENABLED,
+          false);
+      conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
+          memoryAllocationMinimum);
+      conf.setInt(
+          FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB,
+          memoryAllocationIncrement);
+      conf.setFloat(YarnConfiguration.PER_NODE_MAX_OVERALLOCATION_RATIO,
+          maxOverallocationRatio);
+    }
+  }
+
   @Test
   public void testAclSubmitApplication() throws Exception {
     // Set acl's