Browse Source

YARN-2073. Fair Scheduler: Add a utilization threshold to prevent preempting resources when cluster is free (Karthik Kambatla via Sandy Ryza)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1597209 13f79535-47bb-0310-9956-ffa450edef68
Sanford Ryza 11 years ago
parent
commit
a00b2d4f37

+ 3 - 0
hadoop-yarn-project/CHANGES.txt

@@ -104,6 +104,9 @@ Release 2.5.0 - UNRELEASED
 
     YARN-2059. Added admin ACLs support to Timeline Server. (Zhijie Shen via
     vinodkv)
+    
+    YARN-2073. Fair Scheduler: Add a utilization threshold to prevent preempting
+    resources when cluster is free (Karthik Kambatla via Sandy Ryza)
 
   OPTIMIZATIONS
 

+ 26 - 6
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java

@@ -148,7 +148,11 @@ public class FairScheduler extends
   // Time we last ran preemptTasksIfNecessary
   private long lastPreemptCheckTime;
 
-  // How often tasks are preempted 
+  // Preemption related variables
+  protected boolean preemptionEnabled;
+  protected float preemptionUtilizationThreshold;
+
+  // How often tasks are preempted
   protected long preemptionInterval; 
   
   // ms to wait before force killing stuff (must be longer than a couple
@@ -158,7 +162,6 @@ public class FairScheduler extends
   // Containers whose AMs have been warned that they will be preempted soon.
   private List<RMContainer> warnedContainers = new ArrayList<RMContainer>();
   
-  protected boolean preemptionEnabled;
   protected boolean sizeBasedWeight; // Give larger weights to larger jobs
   protected WeightAdjuster weightAdjuster; // Can be null for no weight adjuster
   protected boolean continuousSchedulingEnabled; // Continuous Scheduling enabled or not
@@ -318,7 +321,7 @@ public class FairScheduler extends
    * and then select the right ones using preemptTasks.
    */
   protected synchronized void preemptTasksIfNecessary() {
-    if (!preemptionEnabled) {
+    if (!shouldAttemptPreemption()) {
       return;
     }
 
@@ -328,10 +331,9 @@ public class FairScheduler extends
     }
     lastPreemptCheckTime = curTime;
 
-    Resource resToPreempt = Resources.none();
-
+    Resource resToPreempt = Resources.clone(Resources.none());
     for (FSLeafQueue sched : queueMgr.getLeafQueues()) {
-      resToPreempt = Resources.add(resToPreempt, resToPreempt(sched, curTime));
+      Resources.addTo(resToPreempt, resToPreempt(sched, curTime));
     }
     if (Resources.greaterThan(RESOURCE_CALCULATOR, clusterResource, resToPreempt,
         Resources.none())) {
@@ -1067,6 +1069,22 @@ public class FairScheduler extends
             clusterResource, rootMetrics.getAllocatedResources()));
   }
 
+  /**
+   * Check if preemption is enabled and the utilization threshold for
+   * preemption is met.
+   *
+   * @return true if preemption should be attempted, false otherwise.
+   */
+  private boolean shouldAttemptPreemption() {
+    if (preemptionEnabled) {
+      return (preemptionUtilizationThreshold < Math.max(
+          (float) rootMetrics.getAvailableMB() / clusterResource.getMemory(),
+          (float) rootMetrics.getAvailableVirtualCores() /
+              clusterResource.getVirtualCores()));
+    }
+    return false;
+  }
+
   @Override
   public QueueMetrics getRootQueueMetrics() {
     return rootMetrics;
@@ -1172,6 +1190,8 @@ public class FairScheduler extends
       nodeLocalityDelayMs = this.conf.getLocalityDelayNodeMs();
       rackLocalityDelayMs = this.conf.getLocalityDelayRackMs();
       preemptionEnabled = this.conf.getPreemptionEnabled();
+      preemptionUtilizationThreshold =
+          this.conf.getPreemptionUtilizationThreshold();
       assignMultiple = this.conf.getAssignMultiple();
       maxAssign = this.conf.getMaxAssign();
       sizeBasedWeight = this.conf.getSizeBasedWeight();

+ 8 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java

@@ -101,6 +101,10 @@ public class FairSchedulerConfiguration extends Configuration {
   /** Whether preemption is enabled. */
   protected static final String  PREEMPTION = CONF_PREFIX + "preemption";
   protected static final boolean DEFAULT_PREEMPTION = false;
+
+  protected static final String PREEMPTION_THRESHOLD =
+      CONF_PREFIX + "preemption.cluster-utilization-threshold";
+  protected static final float DEFAULT_PREEMPTION_THRESHOLD = 0.8f;
   
   protected static final String PREEMPTION_INTERVAL = CONF_PREFIX + "preemptionInterval";
   protected static final int DEFAULT_PREEMPTION_INTERVAL = 5000;
@@ -185,6 +189,10 @@ public class FairSchedulerConfiguration extends Configuration {
     return getBoolean(PREEMPTION, DEFAULT_PREEMPTION);
   }
 
+  public float getPreemptionUtilizationThreshold() {
+    return getFloat(PREEMPTION_THRESHOLD, DEFAULT_PREEMPTION_THRESHOLD);
+  }
+
   public boolean getAssignMultiple() {
     return getBoolean(ASSIGN_MULTIPLE, DEFAULT_ASSIGN_MULTIPLE);
   }

+ 172 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java

@@ -0,0 +1,172 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.factories.RecordFactory;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import org.apache.hadoop.yarn.server.utils.BuilderUtils;
+import org.apache.hadoop.yarn.util.Clock;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+
+public class FairSchedulerTestBase {
+  protected static class MockClock implements Clock {
+    private long time = 0;
+    @Override
+    public long getTime() {
+      return time;
+    }
+
+    public void tick(int seconds) {
+      time = time + seconds * 1000;
+    }
+  }
+
+  protected final static String TEST_DIR =
+      new File(System.getProperty("test.build.data", "/tmp")).getAbsolutePath();
+
+  private static RecordFactory
+      recordFactory = RecordFactoryProvider.getRecordFactory(null);
+
+  protected int APP_ID = 1; // Incrementing counter for scheduling apps
+  protected int ATTEMPT_ID = 1; // Incrementing counter for scheduling attempts
+
+  protected Configuration conf;
+  protected FairScheduler scheduler;
+  protected ResourceManager resourceManager;
+
+  // Helper methods
+  protected Configuration createConfiguration() {
+    Configuration conf = new YarnConfiguration();
+    conf.setClass(YarnConfiguration.RM_SCHEDULER, FairScheduler.class,
+        ResourceScheduler.class);
+    conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 0);
+    conf.setInt(FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB,
+        1024);
+    conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, 10240);
+    conf.setBoolean(FairSchedulerConfiguration.ASSIGN_MULTIPLE, false);
+    conf.setFloat(FairSchedulerConfiguration.PREEMPTION_THRESHOLD, 0f);
+    return conf;
+  }
+
+  protected ApplicationAttemptId createAppAttemptId(int appId, int attemptId) {
+    ApplicationId appIdImpl = ApplicationId.newInstance(0, appId);
+    return ApplicationAttemptId.newInstance(appIdImpl, attemptId);
+  }
+
+  protected ResourceRequest createResourceRequest(
+      int memory, String host, int priority, int numContainers,
+      boolean relaxLocality) {
+    return createResourceRequest(memory, 1, host, priority, numContainers,
+        relaxLocality);
+  }
+
+  protected ResourceRequest createResourceRequest(
+      int memory, int vcores, String host, int priority, int numContainers,
+      boolean relaxLocality) {
+    ResourceRequest request = recordFactory.newRecordInstance(ResourceRequest.class);
+    request.setCapability(BuilderUtils.newResource(memory, vcores));
+    request.setResourceName(host);
+    request.setNumContainers(numContainers);
+    Priority prio = recordFactory.newRecordInstance(Priority.class);
+    prio.setPriority(priority);
+    request.setPriority(prio);
+    request.setRelaxLocality(relaxLocality);
+    return request;
+  }
+
+  /**
+   * Creates a single container priority-1 request and submits to
+   * scheduler.
+   */
+  protected ApplicationAttemptId createSchedulingRequest(
+      int memory, String queueId, String userId) {
+    return createSchedulingRequest(memory, queueId, userId, 1);
+  }
+
+  protected ApplicationAttemptId createSchedulingRequest(
+      int memory, int vcores, String queueId, String userId) {
+    return createSchedulingRequest(memory, vcores, queueId, userId, 1);
+  }
+
+  protected ApplicationAttemptId createSchedulingRequest(
+      int memory, String queueId, String userId, int numContainers) {
+    return createSchedulingRequest(memory, queueId, userId, numContainers, 1);
+  }
+
+  protected ApplicationAttemptId createSchedulingRequest(
+      int memory, int vcores, String queueId, String userId, int numContainers) {
+    return createSchedulingRequest(memory, vcores, queueId, userId, numContainers, 1);
+  }
+
+  protected ApplicationAttemptId createSchedulingRequest(
+      int memory, String queueId, String userId, int numContainers, int priority) {
+    return createSchedulingRequest(memory, 1, queueId, userId, numContainers,
+        priority);
+  }
+
+  protected ApplicationAttemptId createSchedulingRequest(
+      int memory, int vcores, String queueId, String userId, int numContainers,
+      int priority) {
+    ApplicationAttemptId id = createAppAttemptId(this.APP_ID++, this.ATTEMPT_ID++);
+    scheduler.addApplication(id.getApplicationId(), queueId, userId);
+    // This conditional is for testAclSubmitApplication where app is rejected
+    // and no app is added.
+    if (scheduler.getSchedulerApplications().containsKey(id.getApplicationId())) {
+      scheduler.addApplicationAttempt(id, false);
+    }
+    List<ResourceRequest> ask = new ArrayList<ResourceRequest>();
+    ResourceRequest request = createResourceRequest(memory, vcores, ResourceRequest.ANY,
+        priority, numContainers, true);
+    ask.add(request);
+    scheduler.allocate(id, ask,  new ArrayList<ContainerId>(), null, null);
+    return id;
+  }
+
+  protected void createSchedulingRequestExistingApplication(
+       int memory, int priority, ApplicationAttemptId attId) {
+    ResourceRequest request = createResourceRequest(memory, ResourceRequest.ANY,
+        priority, 1, true);
+    createSchedulingRequestExistingApplication(request, attId);
+  }
+
+  protected void createSchedulingRequestExistingApplication(
+      int memory, int vcores, int priority, ApplicationAttemptId attId) {
+    ResourceRequest request = createResourceRequest(memory, vcores, ResourceRequest.ANY,
+        priority, 1, true);
+    createSchedulingRequestExistingApplication(request, attId);
+  }
+
+  protected void createSchedulingRequestExistingApplication(
+      ResourceRequest request, ApplicationAttemptId attId) {
+    List<ResourceRequest> ask = new ArrayList<ResourceRequest>();
+    ask.add(request);
+    scheduler.allocate(attId, ask,  new ArrayList<ContainerId>(), null, null);
+  }
+}

+ 4 - 141
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java

@@ -55,7 +55,6 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
 import org.apache.hadoop.yarn.api.records.NodeId;
-import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl;
@@ -63,8 +62,6 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.AsyncDispatcher;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
-import org.apache.hadoop.yarn.factories.RecordFactory;
-import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService;
 import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
@@ -94,7 +91,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateS
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.DominantResourceFairnessPolicy;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FifoPolicy;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
-import org.apache.hadoop.yarn.util.Clock;
 import org.apache.hadoop.yarn.util.resource.Resources;
 import org.junit.After;
 import org.junit.Assert;
@@ -105,46 +101,14 @@ import org.xml.sax.SAXException;
 import com.google.common.collect.Sets;
 
 @SuppressWarnings("unchecked")
-public class TestFairScheduler {
+public class TestFairScheduler extends FairSchedulerTestBase {
+  private final static String ALLOC_FILE =
+      new File(TEST_DIR, "test-queues").getAbsolutePath();
 
-  static class MockClock implements Clock {
-    private long time = 0;
-    @Override
-    public long getTime() {
-      return time;
-    }
-
-    public void tick(int seconds) {
-      time = time + seconds * 1000;
-    }
-
-  }
-
-  final static String TEST_DIR = new File(System.getProperty("test.build.data",
-      "/tmp")).getAbsolutePath();
-
-  final static String ALLOC_FILE = new File(TEST_DIR,
-      "test-queues").getAbsolutePath();
-
-  private FairScheduler scheduler;
-  private ResourceManager resourceManager;
-  private Configuration conf;
-  private static RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
-
-  private int APP_ID = 1; // Incrementing counter for schedling apps
-  private int ATTEMPT_ID = 1; // Incrementing counter for scheduling attempts
-
-  // HELPER METHODS
   @Before
   public void setUp() throws IOException {
     scheduler = new FairScheduler();
     conf = createConfiguration();
-    conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 0);
-    conf.setInt(FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB,
-      1024);
-    conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, 10240);
-    // All tests assume only one assignment per node update
-    conf.set(FairSchedulerConfiguration.ASSIGN_MULTIPLE, "false");
     resourceManager = new ResourceManager();
     resourceManager.init(conf);
 
@@ -198,107 +162,6 @@ public class TestFairScheduler {
     }
   }
 
-  private Configuration createConfiguration() {
-    Configuration conf = new YarnConfiguration();
-    conf.setClass(YarnConfiguration.RM_SCHEDULER, FairScheduler.class,
-        ResourceScheduler.class);
-    return conf;
-  }
-
-  private ApplicationAttemptId createAppAttemptId(int appId, int attemptId) {
-    ApplicationId appIdImpl = ApplicationId.newInstance(0, appId);
-    ApplicationAttemptId attId =
-        ApplicationAttemptId.newInstance(appIdImpl, attemptId);
-    return attId;
-  }
-  
-  private ResourceRequest createResourceRequest(int memory, String host,
-      int priority, int numContainers, boolean relaxLocality) {
-    return createResourceRequest(memory, 1, host, priority, numContainers,
-        relaxLocality);
-  }
-
-  private ResourceRequest createResourceRequest(int memory, int vcores, String host,
-      int priority, int numContainers, boolean relaxLocality) {
-    ResourceRequest request = recordFactory.newRecordInstance(ResourceRequest.class);
-    request.setCapability(BuilderUtils.newResource(memory, vcores));
-    request.setResourceName(host);
-    request.setNumContainers(numContainers);
-    Priority prio = recordFactory.newRecordInstance(Priority.class);
-    prio.setPriority(priority);
-    request.setPriority(prio);
-    request.setRelaxLocality(relaxLocality);
-    return request;
-  }
-
-  /**
-   * Creates a single container priority-1 request and submits to
-   * scheduler.
-   */
-  private ApplicationAttemptId createSchedulingRequest(int memory, String queueId,
-      String userId) {
-    return createSchedulingRequest(memory, queueId, userId, 1);
-  }
-  
-  private ApplicationAttemptId createSchedulingRequest(int memory, int vcores,
-      String queueId, String userId) {
-    return createSchedulingRequest(memory, vcores, queueId, userId, 1);
-  }
-
-  private ApplicationAttemptId createSchedulingRequest(int memory, String queueId,
-      String userId, int numContainers) {
-    return createSchedulingRequest(memory, queueId, userId, numContainers, 1);
-  }
-  
-  private ApplicationAttemptId createSchedulingRequest(int memory, int vcores,
-      String queueId, String userId, int numContainers) {
-    return createSchedulingRequest(memory, vcores, queueId, userId, numContainers, 1);
-  }
-
-  private ApplicationAttemptId createSchedulingRequest(int memory, String queueId,
-      String userId, int numContainers, int priority) {
-    return createSchedulingRequest(memory, 1, queueId, userId, numContainers,
-        priority);
-  }
-  
-  private ApplicationAttemptId createSchedulingRequest(int memory, int vcores,
-      String queueId, String userId, int numContainers, int priority) {
-    ApplicationAttemptId id = createAppAttemptId(this.APP_ID++, this.ATTEMPT_ID++);
-    scheduler.addApplication(id.getApplicationId(), queueId, userId);
-    // This conditional is for testAclSubmitApplication where app is rejected
-    // and no app is added.
-    if (scheduler.getSchedulerApplications().containsKey(id.getApplicationId())) {
-      scheduler.addApplicationAttempt(id, false);
-    }
-    List<ResourceRequest> ask = new ArrayList<ResourceRequest>();
-    ResourceRequest request = createResourceRequest(memory, vcores, ResourceRequest.ANY,
-        priority, numContainers, true);
-    ask.add(request);
-    scheduler.allocate(id, ask,  new ArrayList<ContainerId>(), null, null);
-    return id;
-  }
-  
-  private void createSchedulingRequestExistingApplication(int memory, int priority,
-      ApplicationAttemptId attId) {
-    ResourceRequest request = createResourceRequest(memory, ResourceRequest.ANY,
-        priority, 1, true);
-    createSchedulingRequestExistingApplication(request, attId);
-  }
-  
-  private void createSchedulingRequestExistingApplication(int memory, int vcores,
-      int priority, ApplicationAttemptId attId) {
-	ResourceRequest request = createResourceRequest(memory, vcores, ResourceRequest.ANY,
-		priority, 1, true);
-	createSchedulingRequestExistingApplication(request, attId);
-  }
-  
-  private void createSchedulingRequestExistingApplication(ResourceRequest request,
-      ApplicationAttemptId attId) {
-    List<ResourceRequest> ask = new ArrayList<ResourceRequest>();
-    ask.add(request);
-    scheduler.allocate(attId, ask,  new ArrayList<ContainerId>(), null, null);
-  }
-
   // TESTS
 
   @Test(timeout=2000)
@@ -1455,7 +1318,7 @@ public class TestFairScheduler {
     assertEquals(
         1536, scheduler.resToPreempt(schedD, clock.getTime()).getMemory());
   }
-  
+
   @Test (timeout = 5000)
   public void testMultipleContainersWaitingForReservation() throws IOException {
     scheduler.reinitialize(conf, resourceManager.getRMContext());

+ 178 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java

@@ -0,0 +1,178 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
+import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
+import org.apache.hadoop.yarn.util.resource.Resources;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.util.Collection;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertTrue;
+
+public class TestFairSchedulerPreemption extends FairSchedulerTestBase {
+  private final static String ALLOC_FILE = new File(TEST_DIR,
+      TestFairSchedulerPreemption.class.getName() + ".xml").getAbsolutePath();
+
+  private MockClock clock;
+
+  private static class StubbedFairScheduler extends FairScheduler {
+    public int lastPreemptMemory = -1;
+
+    @Override
+    protected void preemptResources(
+        Collection<FSLeafQueue> scheds, Resource toPreempt) {
+      lastPreemptMemory = toPreempt.getMemory();
+    }
+
+    public void resetLastPreemptResources() {
+      lastPreemptMemory = -1;
+    }
+  }
+
+  @Override
+  protected Configuration createConfiguration() {
+    Configuration conf = super.createConfiguration();
+    conf.setClass(YarnConfiguration.RM_SCHEDULER, StubbedFairScheduler.class,
+        ResourceScheduler.class);
+    conf.setBoolean(FairSchedulerConfiguration.PREEMPTION, true);
+    conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
+    return conf;
+  }
+
+  @Before
+  public void setup() throws IOException {
+    conf = createConfiguration();
+    clock = new MockClock();
+  }
+
+  @After
+  public void teardown() {
+    if (resourceManager != null) {
+      resourceManager.stop();
+      resourceManager = null;
+    }
+    conf = null;
+  }
+
+  private void startResourceManager(float utilizationThreshold) {
+    conf.setFloat(FairSchedulerConfiguration.PREEMPTION_THRESHOLD,
+        utilizationThreshold);
+    resourceManager = new MockRM(conf);
+    resourceManager.start();
+
+    assertTrue(
+        resourceManager.getResourceScheduler() instanceof StubbedFairScheduler);
+    scheduler = (FairScheduler)resourceManager.getResourceScheduler();
+
+    scheduler.setClock(clock);
+    scheduler.UPDATE_INTERVAL = 60 * 1000;
+  }
+
+  private void registerNodeAndSubmitApp(
+      int memory, int vcores, int appContainers, int appMemory) {
+    RMNode node1 = MockNodes.newNodeInfo(
+        1, Resources.createResource(memory, vcores), 1, "node1");
+    NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
+    scheduler.handle(nodeEvent1);
+
+    assertEquals("Incorrect amount of resources in the cluster",
+        memory, scheduler.rootMetrics.getAvailableMB());
+    assertEquals("Incorrect amount of resources in the cluster",
+        vcores, scheduler.rootMetrics.getAvailableVirtualCores());
+
+    createSchedulingRequest(appMemory, "queueA", "user1", appContainers);
+    scheduler.update();
+    // Sufficient node check-ins to fully schedule containers
+    for (int i = 0; i < 3; i++) {
+      NodeUpdateSchedulerEvent nodeUpdate1 = new NodeUpdateSchedulerEvent(node1);
+      scheduler.handle(nodeUpdate1);
+    }
+    assertEquals("app1's request is not met",
+        memory - appContainers * appMemory,
+        scheduler.rootMetrics.getAvailableMB());
+  }
+
+  @Test
+  public void testPreemptionWithFreeResources() throws Exception {
+    PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
+    out.println("<?xml version=\"1.0\"?>");
+    out.println("<allocations>");
+    out.println("<queue name=\"default\">");
+    out.println("<maxResources>0mb,0vcores</maxResources>");
+    out.println("</queue>");
+    out.println("<queue name=\"queueA\">");
+    out.println("<weight>1</weight>");
+    out.println("<minResources>1024mb,0vcores</minResources>");
+    out.println("</queue>");
+    out.println("<queue name=\"queueB\">");
+    out.println("<weight>1</weight>");
+    out.println("<minResources>1024mb,0vcores</minResources>");
+    out.println("</queue>");
+    out.print("<defaultMinSharePreemptionTimeout>5</defaultMinSharePreemptionTimeout>");
+    out.print("<fairSharePreemptionTimeout>10</fairSharePreemptionTimeout>");
+    out.println("</allocations>");
+    out.close();
+
+    startResourceManager(0f);
+    // Create node with 4GB memory and 4 vcores
+    registerNodeAndSubmitApp(4 * 1024, 4, 2, 1024);
+
+    // Verify submitting another request doesn't trigger preemption
+    createSchedulingRequest(1024, "queueB", "user1", 1, 1);
+    scheduler.update();
+    clock.tick(6);
+
+    ((StubbedFairScheduler) scheduler).resetLastPreemptResources();
+    scheduler.preemptTasksIfNecessary();
+    assertEquals("preemptResources() should have been called", 1024,
+        ((StubbedFairScheduler) scheduler).lastPreemptMemory);
+
+    resourceManager.stop();
+
+    startResourceManager(0.8f);
+    // Create node with 4GB memory and 4 vcores
+    registerNodeAndSubmitApp(4 * 1024, 4, 3, 1024);
+
+    // Verify submitting another request doesn't trigger preemption
+    createSchedulingRequest(1024, "queueB", "user1", 1, 1);
+    scheduler.update();
+    clock.tick(6);
+
+    ((StubbedFairScheduler) scheduler).resetLastPreemptResources();
+    scheduler.preemptTasksIfNecessary();
+    assertEquals("preemptResources() should not have been called", -1,
+        ((StubbedFairScheduler) scheduler).lastPreemptMemory);
+  }
+}

+ 6 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/FairScheduler.apt.vm

@@ -156,6 +156,12 @@ Properties that can be placed in yarn-site.xml
     * Whether to use preemption. Note that preemption is experimental in the current
       version. Defaults to false.
 
+ * <<<yarn.scheduler.fair.preemption.cluster-utilization-threshold>>>
+
+    * The utilization threshold after which preemption kicks in. The
+      utilization is computed as the maximum ratio of usage to capacity among
+      all resources. Defaults to 0.8f.
+
  * <<<yarn.scheduler.fair.sizebasedweight>>>
   
     * Whether to assign shares to individual apps based on their size, rather than