瀏覽代碼

YARN-5731. Preemption calculation is not accurate when reserved containers are present in queue. (wangda)

Change-Id: Ie8c5145f449582253dcf8c0f6f388e3660ab1c6b
(cherry picked from commit f2f09e2bb09d3ffbce7a0af32fc225d47dcab5d1)
Wangda Tan 7 年之前
父節點
當前提交
4d5f2c402b

+ 3 - 3
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoCandidatesSelector.java

@@ -44,12 +44,12 @@ public class FifoCandidatesSelector
       LogFactory.getLog(FifoCandidatesSelector.class);
   private PreemptableResourceCalculator preemptableAmountCalculator;
 
-  FifoCandidatesSelector(
-      CapacitySchedulerPreemptionContext preemptionContext) {
+  FifoCandidatesSelector(CapacitySchedulerPreemptionContext preemptionContext,
+      boolean includeReservedResource) {
     super(preemptionContext);
 
     preemptableAmountCalculator = new PreemptableResourceCalculator(
-        preemptionContext, false);
+        preemptionContext, includeReservedResource);
   }
 
   @Override

+ 6 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java

@@ -222,8 +222,13 @@ public class ProportionalCapacityPreemptionPolicy
           .add(new ReservedContainerCandidatesSelector(this));
     }
 
+    boolean additionalPreemptionBasedOnReservedResource = csConfig.getBoolean(
+        CapacitySchedulerConfiguration.ADDITIONAL_RESOURCE_BALANCE_BASED_ON_RESERVED_CONTAINERS,
+        CapacitySchedulerConfiguration.DEFAULT_ADDITIONAL_RESOURCE_BALANCE_BASED_ON_RESERVED_CONTAINERS);
+
     // initialize candidates preemption selection policies
-    candidatesSelectionPolicies.add(new FifoCandidatesSelector(this));
+    candidatesSelectionPolicies.add(new FifoCandidatesSelector(this,
+        additionalPreemptionBasedOnReservedResource));
 
     // Do we need to specially consider intra queue
     boolean isIntraQueuePreemptionEnabled = csConfig.getBoolean(

+ 27 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java

@@ -1102,6 +1102,33 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
   public static final float DEFAULT_PREEMPTION_NATURAL_TERMINATION_FACTOR =
       0.2f;
 
+  /**
+   * By default, reserved resource will be excluded while balancing capacities
+   * of queues.
+   *
+   * Why doing this? In YARN-4390, we added preemption-based-on-reserved-container
+   * Support. To reduce unnecessary preemption for large containers. We will
+   * not include reserved resources while calculating ideal-allocation in
+   * FifoCandidatesSelector.
+   *
+   * Changes in YARN-4390 will significantly reduce number of containers preempted
+   * When cluster has heterogeneous container requests. (Please check test
+   * report: https://issues.apache.org/jira/secure/attachment/12796197/YARN-4390-test-results.pdf
+   *
+   * However, on the other hand, in some corner cases, especially for
+   * fragmented cluster. It could lead to preemption cannot kick in in some
+   * cases. Please see YARN-5731.
+   *
+   * So to solve the problem, make this change to be configurable, and please
+   * note that it is an experimental option.
+   */
+  public static final String
+      ADDITIONAL_RESOURCE_BALANCE_BASED_ON_RESERVED_CONTAINERS =
+      PREEMPTION_CONFIG_PREFIX
+          + "additional_res_balance_based_on_reserved_containers";
+  public static final boolean
+      DEFAULT_ADDITIONAL_RESOURCE_BALANCE_BASED_ON_RESERVED_CONTAINERS = false;
+
   /**
    * When calculating which containers to be preempted, we will try to preempt
    * containers for reserved containers first. By default is false.

+ 5 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerPreemptionTestBase.java

@@ -129,9 +129,10 @@ public class CapacitySchedulerPreemptionTestBase {
   public void waitNumberOfLiveContainersOnNodeFromApp(FiCaSchedulerNode node,
       ApplicationAttemptId appId, int expected) throws InterruptedException {
     int waitNum = 0;
+    int total = 0;
 
     while (waitNum < 500) {
-      int total = 0;
+      total = 0;
       for (RMContainer c : node.getCopiedListOfRunningContainers()) {
         if (c.getApplicationAttemptId().equals(appId)) {
           total++;
@@ -144,6 +145,8 @@ public class CapacitySchedulerPreemptionTestBase {
       waitNum++;
     }
 
-    Assert.fail();
+    Assert.fail(
+        "Check #live-container-on-node-from-app, actual=" + total + " expected="
+            + expected);
   }
 }

+ 98 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerSurgicalPreemption.java

@@ -22,6 +22,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
 import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
 import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
@@ -38,6 +39,7 @@ import org.junit.Test;
 
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.List;
 
 public class TestCapacitySchedulerSurgicalPreemption
     extends CapacitySchedulerPreemptionTestBase {
@@ -243,4 +245,100 @@ public class TestCapacitySchedulerSurgicalPreemption
 
     rm1.close();
   }
+
+  @Test(timeout = 60000)
+  public void testPreemptionForFragmentatedCluster() throws Exception {
+    // Set additional_balance_queue_based_on_reserved_res to true to get
+    // additional preemptions.
+    conf.setBoolean(
+        CapacitySchedulerConfiguration.ADDITIONAL_RESOURCE_BALANCE_BASED_ON_RESERVED_CONTAINERS,
+        true);
+
+    /**
+     * Two queues, a/b, each of them are 50/50
+     * 5 nodes in the cluster, each of them is 30G.
+     *
+     * Submit first app, AM = 3G, and 4 * 21G containers.
+     * Submit second app, AM = 3G, and 4 * 21G containers,
+     *
+     * We can get one container preempted from 1st app.
+     */
+    CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration(
+        this.conf);
+    conf.setLong(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
+        1024 * 21);
+    conf.setQueues("root", new String[] { "a", "b" });
+    conf.setCapacity("root.a", 50);
+    conf.setUserLimitFactor("root.a", 100);
+    conf.setCapacity("root.b", 50);
+    conf.setUserLimitFactor("root.b", 100);
+    MockRM rm1 = new MockRM(conf);
+    rm1.getRMContext().setNodeLabelManager(mgr);
+    rm1.start();
+
+    List<MockNM> nms = new ArrayList<>();
+    for (int i = 0; i < 5; i++) {
+      nms.add(rm1.registerNode("h" + i + ":1234", 30 * GB));
+    }
+
+    CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
+
+    // launch an app to queue, AM container should be launched in nm1
+    RMApp app1 = rm1.submitApp(3 * GB, "app", "user", null, "a");
+    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nms.get(0));
+
+    am1.allocate("*", 21 * GB, 4, new ArrayList<ContainerId>());
+
+    // Do allocation for all nodes
+    for (int i = 0; i < 10; i++) {
+      MockNM mockNM = nms.get(i % nms.size());
+      RMNode rmNode = cs.getRMContext().getRMNodes().get(mockNM.getNodeId());
+      cs.handle(new NodeUpdateSchedulerEvent(rmNode));
+    }
+
+    // App1 should have 5 containers now
+    FiCaSchedulerApp schedulerApp1 = cs.getApplicationAttempt(
+        am1.getApplicationAttemptId());
+    Assert.assertEquals(5, schedulerApp1.getLiveContainers().size());
+
+    // launch an app to queue, AM container should be launched in nm1
+    RMApp app2 = rm1.submitApp(3 * GB, "app", "user", null, "b");
+    MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nms.get(2));
+
+    am2.allocate("*", 21 * GB, 4, new ArrayList<ContainerId>());
+
+    // Do allocation for all nodes
+    for (int i = 0; i < 10; i++) {
+      MockNM mockNM = nms.get(i % nms.size());
+      RMNode rmNode = cs.getRMContext().getRMNodes().get(mockNM.getNodeId());
+      cs.handle(new NodeUpdateSchedulerEvent(rmNode));
+    }
+
+    // App2 should have 2 containers now
+    FiCaSchedulerApp schedulerApp2 = cs.getApplicationAttempt(
+        am2.getApplicationAttemptId());
+    Assert.assertEquals(2, schedulerApp2.getLiveContainers().size());
+
+    waitNumberOfReservedContainersFromApp(schedulerApp2, 1);
+
+    // Call editSchedule twice and allocation once, container should get allocated
+    SchedulingEditPolicy editPolicy = getSchedulingEditPolicy(rm1);
+    editPolicy.editSchedule();
+    editPolicy.editSchedule();
+
+    int tick = 0;
+    while (schedulerApp2.getLiveContainers().size() != 4 && tick < 10) {
+      // Do allocation for all nodes
+      for (int i = 0; i < 10; i++) {
+        MockNM mockNM = nms.get(i % nms.size());
+        RMNode rmNode = cs.getRMContext().getRMNodes().get(mockNM.getNodeId());
+        cs.handle(new NodeUpdateSchedulerEvent(rmNode));
+      }
+      tick++;
+      Thread.sleep(100);
+    }
+    Assert.assertEquals(3, schedulerApp2.getLiveContainers().size());
+
+    rm1.close();
+  }
 }