Browse Source

YARN-2394. FairScheduler: Configure fairSharePreemptionThreshold per queue. (Wei Yan via kasha)

Karthik Kambatla 10 năm trước cách đây
mục cha
commit
1dcaba9a7a
12 tập tin đã thay đổi với 412 bổ sung247 xóa
  1. 3 0
      hadoop-yarn-project/CHANGES.txt
  2. 21 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java
  3. 39 9
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java
  4. 47 7
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java
  5. 3 3
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java
  6. 19 4
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java
  7. 18 42
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
  8. 5 27
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java
  9. 43 2
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestAllocationFileLoaderService.java
  10. 170 28
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java
  11. 34 124
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
  12. 10 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/FairScheduler.apt.vm

+ 3 - 0
hadoop-yarn-project/CHANGES.txt

@@ -61,6 +61,9 @@ Release 2.6.0 - UNRELEASED
     YARN-2395. FairScheduler: Preemption timeout should be configurable per 
     queue. (Wei Yan via kasha)
 
+    YARN-2394. FairScheduler: Configure fairSharePreemptionThreshold per queue.
+    (Wei Yan via kasha)
+
   IMPROVEMENTS
 
     YARN-2197. Add a link to YARN CHANGES.txt in the left side of doc

+ 21 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java

@@ -70,6 +70,12 @@ public class AllocationConfiguration {
   // allowed to preempt other jobs' tasks.
   private final Map<String, Long> fairSharePreemptionTimeouts;
 
+  // The fair share preemption threshold for each queue. If a queue waits
+  // fairSharePreemptionTimeout without receiving
+  // fairshare * fairSharePreemptionThreshold resources, it is allowed to
+  // preempt other queues' tasks.
+  private final Map<String, Float> fairSharePreemptionThresholds;
+
   private final Map<String, SchedulingPolicy> schedulingPolicies;
   
   private final SchedulingPolicy defaultSchedulingPolicy;
@@ -92,6 +98,7 @@ public class AllocationConfiguration {
       SchedulingPolicy defaultSchedulingPolicy,
       Map<String, Long> minSharePreemptionTimeouts,
       Map<String, Long> fairSharePreemptionTimeouts,
+      Map<String, Float> fairSharePreemptionThresholds,
       Map<String, Map<QueueACL, AccessControlList>> queueAcls,
       QueuePlacementPolicy placementPolicy,
       Map<FSQueueType, Set<String>> configuredQueues) {
@@ -108,6 +115,7 @@ public class AllocationConfiguration {
     this.schedulingPolicies = schedulingPolicies;
     this.minSharePreemptionTimeouts = minSharePreemptionTimeouts;
     this.fairSharePreemptionTimeouts = fairSharePreemptionTimeouts;
+    this.fairSharePreemptionThresholds = fairSharePreemptionThresholds;
     this.queueAcls = queueAcls;
     this.placementPolicy = placementPolicy;
     this.configuredQueues = configuredQueues;
@@ -126,6 +134,7 @@ public class AllocationConfiguration {
     queueAcls = new HashMap<String, Map<QueueACL, AccessControlList>>();
     minSharePreemptionTimeouts = new HashMap<String, Long>();
     fairSharePreemptionTimeouts = new HashMap<String, Long>();
+    fairSharePreemptionThresholds = new HashMap<String, Float>();
     schedulingPolicies = new HashMap<String, SchedulingPolicy>();
     defaultSchedulingPolicy = SchedulingPolicy.DEFAULT_POLICY;
     configuredQueues = new HashMap<FSQueueType, Set<String>>();
@@ -171,7 +180,18 @@ public class AllocationConfiguration {
     return (fairSharePreemptionTimeout == null) ?
         -1 : fairSharePreemptionTimeout;
   }
-  
+
+  /**
+   * Get a queue's fair share preemption threshold in the allocation file.
+   * Return -1f if not set.
+   */
+  public float getFairSharePreemptionThreshold(String queueName) {
+    Float fairSharePreemptionThreshold =
+        fairSharePreemptionThresholds.get(queueName);
+    return (fairSharePreemptionThreshold == null) ?
+        -1f : fairSharePreemptionThreshold;
+  }
+
   public ResourceWeights getQueueWeight(String queue) {
     ResourceWeights weight = queueWeights.get(queue);
     return (weight == null) ? ResourceWeights.NEUTRAL : weight;

+ 39 - 9
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java

@@ -218,6 +218,8 @@ public class AllocationFileLoaderService extends AbstractService {
     Map<String, SchedulingPolicy> queuePolicies = new HashMap<String, SchedulingPolicy>();
     Map<String, Long> minSharePreemptionTimeouts = new HashMap<String, Long>();
     Map<String, Long> fairSharePreemptionTimeouts = new HashMap<String, Long>();
+    Map<String, Float> fairSharePreemptionThresholds =
+        new HashMap<String, Float>();
     Map<String, Map<QueueACL, AccessControlList>> queueAcls =
         new HashMap<String, Map<QueueACL, AccessControlList>>();
     int userMaxAppsDefault = Integer.MAX_VALUE;
@@ -225,6 +227,7 @@ public class AllocationFileLoaderService extends AbstractService {
     float queueMaxAMShareDefault = -1.0f;
     long defaultFairSharePreemptionTimeout = Long.MAX_VALUE;
     long defaultMinSharePreemptionTimeout = Long.MAX_VALUE;
+    float defaultFairSharePreemptionThreshold = 0.5f;
     SchedulingPolicy defaultSchedPolicy = SchedulingPolicy.DEFAULT_POLICY;
 
     QueuePlacementPolicy newPlacementPolicy = null;
@@ -277,7 +280,8 @@ public class AllocationFileLoaderService extends AbstractService {
           String text = ((Text)element.getFirstChild()).getData().trim();
           int val = Integer.parseInt(text);
           userMaxAppsDefault = val;
-        } else if ("defaultFairSharePreemptionTimeout".equals(element.getTagName())) {
+        } else if ("defaultFairSharePreemptionTimeout"
+            .equals(element.getTagName())) {
           String text = ((Text)element.getFirstChild()).getData().trim();
           long val = Long.parseLong(text) * 1000L;
           defaultFairSharePreemptionTimeout = val;
@@ -287,10 +291,17 @@ public class AllocationFileLoaderService extends AbstractService {
             long val = Long.parseLong(text) * 1000L;
             defaultFairSharePreemptionTimeout = val;
           }
-        } else if ("defaultMinSharePreemptionTimeout".equals(element.getTagName())) {
+        } else if ("defaultMinSharePreemptionTimeout"
+            .equals(element.getTagName())) {
           String text = ((Text)element.getFirstChild()).getData().trim();
           long val = Long.parseLong(text) * 1000L;
           defaultMinSharePreemptionTimeout = val;
+        } else if ("defaultFairSharePreemptionThreshold"
+            .equals(element.getTagName())) {
+          String text = ((Text)element.getFirstChild()).getData().trim();
+          float val = Float.parseFloat(text);
+          val = Math.max(Math.min(val, 1.0f), 0.0f);
+          defaultFairSharePreemptionThreshold = val;
         } else if ("queueMaxAppsDefault".equals(element.getTagName())) {
           String text = ((Text)element.getFirstChild()).getData().trim();
           int val = Integer.parseInt(text);
@@ -326,7 +337,7 @@ public class AllocationFileLoaderService extends AbstractService {
       loadQueue(parent, element, minQueueResources, maxQueueResources,
           queueMaxApps, userMaxApps, queueMaxAMShares, queueWeights,
           queuePolicies, minSharePreemptionTimeouts, fairSharePreemptionTimeouts,
-          queueAcls, configuredQueues);
+          fairSharePreemptionThresholds, queueAcls, configuredQueues);
     }
 
     // Load placement policy and pass it configured queues
@@ -349,11 +360,18 @@ public class AllocationFileLoaderService extends AbstractService {
           defaultFairSharePreemptionTimeout);
     }
 
+    // Set the fair share preemption threshold for the root queue
+    if (!fairSharePreemptionThresholds.containsKey(QueueManager.ROOT_QUEUE)) {
+      fairSharePreemptionThresholds.put(QueueManager.ROOT_QUEUE,
+          defaultFairSharePreemptionThreshold);
+    }
+
     AllocationConfiguration info = new AllocationConfiguration(minQueueResources,
         maxQueueResources, queueMaxApps, userMaxApps, queueWeights,
         queueMaxAMShares, userMaxAppsDefault, queueMaxAppsDefault,
         queueMaxAMShareDefault, queuePolicies, defaultSchedPolicy,
-        minSharePreemptionTimeouts, fairSharePreemptionTimeouts, queueAcls,
+        minSharePreemptionTimeouts, fairSharePreemptionTimeouts,
+        fairSharePreemptionThresholds, queueAcls,
         newPlacementPolicy, configuredQueues);
     
     lastSuccessfulReload = clock.getTime();
@@ -365,13 +383,15 @@ public class AllocationFileLoaderService extends AbstractService {
   /**
    * Loads a queue from a queue element in the configuration file
    */
-  private void loadQueue(String parentName, Element element, Map<String, Resource> minQueueResources,
+  private void loadQueue(String parentName, Element element,
+      Map<String, Resource> minQueueResources,
       Map<String, Resource> maxQueueResources, Map<String, Integer> queueMaxApps,
       Map<String, Integer> userMaxApps, Map<String, Float> queueMaxAMShares,
       Map<String, ResourceWeights> queueWeights,
       Map<String, SchedulingPolicy> queuePolicies,
       Map<String, Long> minSharePreemptionTimeouts,
       Map<String, Long> fairSharePreemptionTimeouts,
+      Map<String, Float> fairSharePreemptionThresholds,
       Map<String, Map<QueueACL, AccessControlList>> queueAcls, 
       Map<FSQueueType, Set<String>> configuredQueues) 
       throws AllocationConfigurationException {
@@ -418,6 +438,11 @@ public class AllocationFileLoaderService extends AbstractService {
         String text = ((Text)field.getFirstChild()).getData().trim();
         long val = Long.parseLong(text) * 1000L;
         fairSharePreemptionTimeouts.put(queueName, val);
+      } else if ("fairSharePreemptionThreshold".equals(field.getTagName())) {
+        String text = ((Text)field.getFirstChild()).getData().trim();
+        float val = Float.parseFloat(text);
+        val = Math.max(Math.min(val, 1.0f), 0.0f);
+        fairSharePreemptionThresholds.put(queueName, val);
       } else if ("schedulingPolicy".equals(field.getTagName())
           || "schedulingMode".equals(field.getTagName())) {
         String text = ((Text)field.getFirstChild()).getData().trim();
@@ -434,7 +459,8 @@ public class AllocationFileLoaderService extends AbstractService {
         loadQueue(queueName, field, minQueueResources, maxQueueResources,
             queueMaxApps, userMaxApps, queueMaxAMShares, queueWeights,
             queuePolicies, minSharePreemptionTimeouts,
-            fairSharePreemptionTimeouts, queueAcls, configuredQueues);
+            fairSharePreemptionTimeouts, fairSharePreemptionThresholds,
+            queueAcls, configuredQueues);
         configuredQueues.get(FSQueueType.PARENT).add(queueName);
         isLeaf = false;
       }
@@ -449,11 +475,15 @@ public class AllocationFileLoaderService extends AbstractService {
       }
     }
     queueAcls.put(queueName, acls);
-    if (maxQueueResources.containsKey(queueName) && minQueueResources.containsKey(queueName)
+    if (maxQueueResources.containsKey(queueName) &&
+        minQueueResources.containsKey(queueName)
         && !Resources.fitsIn(minQueueResources.get(queueName),
             maxQueueResources.get(queueName))) {
-      LOG.warn(String.format("Queue %s has max resources %s less than min resources %s",
-          queueName, maxQueueResources.get(queueName), minQueueResources.get(queueName)));
+      LOG.warn(
+          String.format(
+              "Queue %s has max resources %s less than min resources %s",
+          queueName, maxQueueResources.get(queueName),
+              minQueueResources.get(queueName)));
     }
   }
   

+ 47 - 7
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java

@@ -24,6 +24,7 @@ import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
 
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
@@ -54,7 +55,7 @@ public class FSLeafQueue extends FSQueue {
   
   // Variables used for preemption
   private long lastTimeAtMinShare;
-  private long lastTimeAtHalfFairShare;
+  private long lastTimeAtFairShareThreshold;
   
   // Track the AM resource usage for this queue
   private Resource amResourceUsage;
@@ -65,7 +66,7 @@ public class FSLeafQueue extends FSQueue {
       FSParentQueue parent) {
     super(name, scheduler, parent);
     this.lastTimeAtMinShare = scheduler.getClock().getTime();
-    this.lastTimeAtHalfFairShare = scheduler.getClock().getTime();
+    this.lastTimeAtFairShareThreshold = scheduler.getClock().getTime();
     activeUsersManager = new ActiveUsersManager(getMetrics());
     amResourceUsage = Resource.newInstance(0, 0);
   }
@@ -275,16 +276,17 @@ public class FSLeafQueue extends FSQueue {
     return lastTimeAtMinShare;
   }
 
-  public void setLastTimeAtMinShare(long lastTimeAtMinShare) {
+  private void setLastTimeAtMinShare(long lastTimeAtMinShare) {
     this.lastTimeAtMinShare = lastTimeAtMinShare;
   }
 
-  public long getLastTimeAtHalfFairShare() {
-    return lastTimeAtHalfFairShare;
+  public long getLastTimeAtFairShareThreshold() {
+    return lastTimeAtFairShareThreshold;
   }
 
-  public void setLastTimeAtHalfFairShare(long lastTimeAtHalfFairShare) {
-    this.lastTimeAtHalfFairShare = lastTimeAtHalfFairShare;
+  private void setLastTimeAtFairShareThreshold(
+      long lastTimeAtFairShareThreshold) {
+    this.lastTimeAtFairShareThreshold = lastTimeAtFairShareThreshold;
   }
 
   @Override
@@ -328,6 +330,20 @@ public class FSLeafQueue extends FSQueue {
     // TODO Auto-generated method stub
   }
 
+  /**
+   * Update the preemption fields for the queue, i.e. the times since last was
+   * at its guaranteed share and over its fair share threshold.
+   */
+  public void updateStarvationStats() {
+    long now = scheduler.getClock().getTime();
+    if (!isStarvedForMinShare()) {
+      setLastTimeAtMinShare(now);
+    }
+    if (!isStarvedForFairShare()) {
+      setLastTimeAtFairShareThreshold(now);
+    }
+  }
+
   /**
    * Helper method to check if the queue should preempt containers
    *
@@ -337,4 +353,28 @@ public class FSLeafQueue extends FSQueue {
     return parent.getPolicy().checkIfUsageOverFairShare(getResourceUsage(),
         getFairShare());
   }
+
+  /**
+   * Is a queue being starved for its min share.
+   */
+  @VisibleForTesting
+  boolean isStarvedForMinShare() {
+    return isStarved(getMinShare());
+  }
+
+  /**
+   * Is a queue being starved for its fair share threshold.
+   */
+  @VisibleForTesting
+  boolean isStarvedForFairShare() {
+    return isStarved(
+        Resources.multiply(getFairShare(), getFairSharePreemptionThreshold()));
+  }
+
+  private boolean isStarved(Resource share) {
+    Resource desiredShare = Resources.min(FairScheduler.getResourceCalculator(),
+        scheduler.getClusterResource(), share, getDemand());
+    return Resources.lessThan(FairScheduler.getResourceCalculator(),
+        scheduler.getClusterResource(), getResourceUsage(), desiredShare);
+  }
 }

+ 3 - 3
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java

@@ -78,11 +78,11 @@ public class FSParentQueue extends FSQueue {
   }
 
   @Override
-  public void updatePreemptionTimeouts() {
-    super.updatePreemptionTimeouts();
+  public void updatePreemptionVariables() {
+    super.updatePreemptionVariables();
     // For child queues
     for (FSQueue childQueue : childQueues) {
-      childQueue.updatePreemptionTimeouts();
+      childQueue.updatePreemptionVariables();
     }
   }
 

+ 19 - 4
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java

@@ -54,6 +54,7 @@ public abstract class FSQueue implements Queue, Schedulable {
 
   private long fairSharePreemptionTimeout = Long.MAX_VALUE;
   private long minSharePreemptionTimeout = Long.MAX_VALUE;
+  private float fairSharePreemptionThreshold = 0.5f;
 
   public FSQueue(String name, FairScheduler scheduler, FSParentQueue parent) {
     this.name = name;
@@ -186,6 +187,14 @@ public abstract class FSQueue implements Queue, Schedulable {
     this.minSharePreemptionTimeout = minSharePreemptionTimeout;
   }
 
+  public float getFairSharePreemptionThreshold() {
+    return fairSharePreemptionThreshold;
+  }
+
+  public void setFairSharePreemptionThreshold(float fairSharePreemptionThreshold) {
+    this.fairSharePreemptionThreshold = fairSharePreemptionThreshold;
+  }
+
   /**
    * Recomputes the shares for all child queues and applications based on this
    * queue's current share
@@ -193,21 +202,27 @@ public abstract class FSQueue implements Queue, Schedulable {
   public abstract void recomputeShares();
 
   /**
-   * Update the min/fair share preemption timeouts for this queue.
+   * Update the min/fair share preemption timeouts and threshold for this queue.
    */
-  public void updatePreemptionTimeouts() {
-    // For min share
+  public void updatePreemptionVariables() {
+    // For min share timeout
     minSharePreemptionTimeout = scheduler.getAllocationConfiguration()
         .getMinSharePreemptionTimeout(getName());
     if (minSharePreemptionTimeout == -1 && parent != null) {
       minSharePreemptionTimeout = parent.getMinSharePreemptionTimeout();
     }
-    // For fair share
+    // For fair share timeout
     fairSharePreemptionTimeout = scheduler.getAllocationConfiguration()
         .getFairSharePreemptionTimeout(getName());
     if (fairSharePreemptionTimeout == -1 && parent != null) {
       fairSharePreemptionTimeout = parent.getFairSharePreemptionTimeout();
     }
+    // For fair share preemption threshold
+    fairSharePreemptionThreshold = scheduler.getAllocationConfiguration()
+        .getFairSharePreemptionThreshold(getName());
+    if (fairSharePreemptionThreshold < 0 && parent != null) {
+      fairSharePreemptionThreshold = parent.getFairSharePreemptionThreshold();
+    }
   }
 
   /**

+ 18 - 42
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java

@@ -299,7 +299,7 @@ public class FairScheduler extends
    */
   protected synchronized void update() {
     long start = getClock().getTime();
-    updatePreemptionVariables(); // Determine if any queues merit preemption
+    updateStarvationStats(); // Determine if any queues merit preemption
 
     FSQueue rootQueue = queueMgr.getRootQueue();
 
@@ -329,48 +329,20 @@ public class FairScheduler extends
 
   /**
    * Update the preemption fields for all QueueScheduables, i.e. the times since
-   * each queue last was at its guaranteed share and at > 1/2 of its fair share
-   * for each type of task.
+   * each queue last was at its guaranteed share and over its fair share
+   * threshold for each type of task.
    */
-  private void updatePreemptionVariables() {
-    long now = getClock().getTime();
-    lastPreemptionUpdateTime = now;
+  private void updateStarvationStats() {
+    lastPreemptionUpdateTime = clock.getTime();
     for (FSLeafQueue sched : queueMgr.getLeafQueues()) {
-      if (!isStarvedForMinShare(sched)) {
-        sched.setLastTimeAtMinShare(now);
-      }
-      if (!isStarvedForFairShare(sched)) {
-        sched.setLastTimeAtHalfFairShare(now);
-      }
+      sched.updateStarvationStats();
     }
   }
 
-  /**
-   * Is a queue below its min share for the given task type?
-   */
-  boolean isStarvedForMinShare(FSLeafQueue sched) {
-    Resource desiredShare = Resources.min(RESOURCE_CALCULATOR, clusterResource,
-      sched.getMinShare(), sched.getDemand());
-    return Resources.lessThan(RESOURCE_CALCULATOR, clusterResource,
-        sched.getResourceUsage(), desiredShare);
-  }
-
-  /**
-   * Is a queue being starved for fair share for the given task type? This is
-   * defined as being below half its fair share.
-   */
-  boolean isStarvedForFairShare(FSLeafQueue sched) {
-    Resource desiredFairShare = Resources.min(RESOURCE_CALCULATOR,
-        clusterResource,
-        Resources.multiply(sched.getFairShare(), .5), sched.getDemand());
-    return Resources.lessThan(RESOURCE_CALCULATOR, clusterResource,
-        sched.getResourceUsage(), desiredFairShare);
-  }
-
   /**
    * Check for queues that need tasks preempted, either because they have been
    * below their guaranteed share for minSharePreemptionTimeout or they have
-   * been below half their fair share for the fairSharePreemptionTimeout. If
+   * been below their fair share threshold for the fairSharePreemptionTimeout. If
    * such queues exist, compute how many tasks of each type need to be preempted
    * and then select the right ones using preemptTasks.
    */
@@ -499,11 +471,11 @@ public class FairScheduler extends
    * Return the resource amount that this queue is allowed to preempt, if any.
    * If the queue has been below its min share for at least its preemption
    * timeout, it should preempt the difference between its current share and
-   * this min share. If it has been below half its fair share for at least the
-   * fairSharePreemptionTimeout, it should preempt enough tasks to get up to its
-   * full fair share. If both conditions hold, we preempt the max of the two
-   * amounts (this shouldn't happen unless someone sets the timeouts to be
-   * identical for some reason).
+   * this min share. If it has been below its fair share preemption threshold
+   * for at least the fairSharePreemptionTimeout, it should preempt enough tasks
+   * to get up to its full fair share. If both conditions hold, we preempt the
+   * max of the two amounts (this shouldn't happen unless someone sets the
+   * timeouts to be identical for some reason).
    */
   protected Resource resToPreempt(FSLeafQueue sched, long curTime) {
     long minShareTimeout = sched.getMinSharePreemptionTimeout();
@@ -516,7 +488,7 @@ public class FairScheduler extends
       resDueToMinShare = Resources.max(RESOURCE_CALCULATOR, clusterResource,
           Resources.none(), Resources.subtract(target, sched.getResourceUsage()));
     }
-    if (curTime - sched.getLastTimeAtHalfFairShare() > fairShareTimeout) {
+    if (curTime - sched.getLastTimeAtFairShareThreshold() > fairShareTimeout) {
       Resource target = Resources.min(RESOURCE_CALCULATOR, clusterResource,
           sched.getFairShare(), sched.getDemand());
       resDueToFairShare = Resources.max(RESOURCE_CALCULATOR, clusterResource,
@@ -1094,7 +1066,11 @@ public class FairScheduler extends
   public FSAppAttempt getSchedulerApp(ApplicationAttemptId appAttemptId) {
     return super.getApplicationAttempt(appAttemptId);
   }
-  
+
+  public static ResourceCalculator getResourceCalculator() {
+    return RESOURCE_CALCULATOR;
+  }
+
   /**
    * Subqueue metrics might be a little out of date because fair shares are
    * recalculated at the update interval, but the root queue metrics needs to

+ 5 - 27
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java

@@ -181,7 +181,7 @@ public class QueueManager {
         parent.addChildQueue(leafQueue);
         queues.put(leafQueue.getName(), leafQueue);
         leafQueues.add(leafQueue);
-        setPreemptionTimeout(leafQueue, parent, queueConf);
+        leafQueue.updatePreemptionVariables();
         return leafQueue;
       } else {
         FSParentQueue newParent = new FSParentQueue(queueName, scheduler, parent);
@@ -193,7 +193,7 @@ public class QueueManager {
         }
         parent.addChildQueue(newParent);
         queues.put(newParent.getName(), newParent);
-        setPreemptionTimeout(newParent, parent, queueConf);
+        newParent.updatePreemptionVariables();
         parent = newParent;
       }
     }
@@ -201,29 +201,6 @@ public class QueueManager {
     return parent;
   }
 
-  /**
-   * Set the min/fair share preemption timeouts for the given queue.
-   * If the timeout is configured in the allocation file, the queue will use
-   * that value; otherwise, the queue inherits the value from its parent queue.
-   */
-  private void setPreemptionTimeout(FSQueue queue,
-      FSParentQueue parentQueue, AllocationConfiguration queueConf) {
-    // For min share
-    long minSharePreemptionTimeout =
-        queueConf.getMinSharePreemptionTimeout(queue.getQueueName());
-    if (minSharePreemptionTimeout == -1) {
-      minSharePreemptionTimeout = parentQueue.getMinSharePreemptionTimeout();
-    }
-    queue.setMinSharePreemptionTimeout(minSharePreemptionTimeout);
-    // For fair share
-    long fairSharePreemptionTimeout =
-        queueConf.getFairSharePreemptionTimeout(queue.getQueueName());
-    if (fairSharePreemptionTimeout == -1) {
-      fairSharePreemptionTimeout = parentQueue.getFairSharePreemptionTimeout();
-    }
-    queue.setFairSharePreemptionTimeout(fairSharePreemptionTimeout);
-  }
-
   /**
    * Make way for the given queue if possible, by removing incompatible
    * queues with no apps in them. Incompatibility could be due to
@@ -409,7 +386,8 @@ public class QueueManager {
 
     // Update steady fair shares for all queues
     rootQueue.recomputeSteadyShares();
-    // Update the fair share preemption timeouts for all queues recursively
-    rootQueue.updatePreemptionTimeouts();
+    // Update the fair share preemption timeouts and preemption for all queues
+    // recursively
+    rootQueue.updatePreemptionVariables();
   }
 }

+ 43 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestAllocationFileLoaderService.java

@@ -187,13 +187,15 @@ public class TestAllocationFileLoaderService {
     out.println("<queue name=\"queueF\" type=\"parent\" >");
     out.println("</queue>");
     // Create hierarchical queues G,H, with different min/fair share preemption
-    // timeouts
+    // timeouts and preemption thresholds
     out.println("<queue name=\"queueG\">");
     out.println("<fairSharePreemptionTimeout>120</fairSharePreemptionTimeout>");
     out.println("<minSharePreemptionTimeout>50</minSharePreemptionTimeout>");
+    out.println("<fairSharePreemptionThreshold>0.6</fairSharePreemptionThreshold>");
     out.println("   <queue name=\"queueH\">");
     out.println("   <fairSharePreemptionTimeout>180</fairSharePreemptionTimeout>");
     out.println("   <minSharePreemptionTimeout>40</minSharePreemptionTimeout>");
+    out.println("   <fairSharePreemptionThreshold>0.7</fairSharePreemptionThreshold>");
     out.println("   </queue>");
     out.println("</queue>");
     // Set default limit of apps per queue to 15
@@ -211,6 +213,8 @@ public class TestAllocationFileLoaderService {
         + "</defaultMinSharePreemptionTimeout>");
     // Set default fair share preemption timeout to 5 minutes
     out.println("<defaultFairSharePreemptionTimeout>300</defaultFairSharePreemptionTimeout>");
+    // Set default fair share preemption threshold to 0.4
+    out.println("<defaultFairSharePreemptionThreshold>0.4</defaultFairSharePreemptionThreshold>");
     // Set default scheduling policy to DRF
     out.println("<defaultQueueSchedulingPolicy>drf</defaultQueueSchedulingPolicy>");
     out.println("</allocations>");
@@ -299,6 +303,26 @@ public class TestAllocationFileLoaderService {
     assertEquals(120000, queueConf.getFairSharePreemptionTimeout("root.queueG"));
     assertEquals(180000, queueConf.getFairSharePreemptionTimeout("root.queueG.queueH"));
 
+    assertEquals(.4f, queueConf.getFairSharePreemptionThreshold("root"), 0.01);
+    assertEquals(-1, queueConf.getFairSharePreemptionThreshold("root." +
+        YarnConfiguration.DEFAULT_QUEUE_NAME), 0.01);
+    assertEquals(-1,
+        queueConf.getFairSharePreemptionThreshold("root.queueA"), 0.01);
+    assertEquals(-1,
+        queueConf.getFairSharePreemptionThreshold("root.queueB"), 0.01);
+    assertEquals(-1,
+        queueConf.getFairSharePreemptionThreshold("root.queueC"), 0.01);
+    assertEquals(-1,
+        queueConf.getFairSharePreemptionThreshold("root.queueD"), 0.01);
+    assertEquals(-1,
+        queueConf.getFairSharePreemptionThreshold("root.queueE"), 0.01);
+    assertEquals(-1,
+        queueConf.getFairSharePreemptionThreshold("root.queueF"), 0.01);
+    assertEquals(.6f,
+        queueConf.getFairSharePreemptionThreshold("root.queueG"), 0.01);
+    assertEquals(.7f,
+        queueConf.getFairSharePreemptionThreshold("root.queueG.queueH"), 0.01);
+
     assertTrue(queueConf.getConfiguredQueues()
         .get(FSQueueType.PARENT)
         .contains("root.queueF"));
@@ -346,9 +370,10 @@ public class TestAllocationFileLoaderService {
     out.println("<pool name=\"queueD\">");
     out.println("<maxRunningApps>3</maxRunningApps>");
     out.println("</pool>");
-    // Give queue E a preemption timeout of one minute
+    // Give queue E a preemption timeout of one minute and 0.3f threshold
     out.println("<pool name=\"queueE\">");
     out.println("<minSharePreemptionTimeout>60</minSharePreemptionTimeout>");
+    out.println("<fairSharePreemptionThreshold>0.3</fairSharePreemptionThreshold>");
     out.println("</pool>");
     // Set default limit of apps per queue to 15
     out.println("<queueMaxAppsDefault>15</queueMaxAppsDefault>");
@@ -363,6 +388,8 @@ public class TestAllocationFileLoaderService {
         + "</defaultMinSharePreemptionTimeout>");
     // Set fair share preemption timeout to 5 minutes
     out.println("<fairSharePreemptionTimeout>300</fairSharePreemptionTimeout>");
+    // Set default fair share preemption threshold to 0.6f
+    out.println("<defaultFairSharePreemptionThreshold>0.6</defaultFairSharePreemptionThreshold>");
     out.println("</allocations>");
     out.close();
     
@@ -429,6 +456,20 @@ public class TestAllocationFileLoaderService {
     assertEquals(-1, queueConf.getFairSharePreemptionTimeout("root.queueC"));
     assertEquals(-1, queueConf.getFairSharePreemptionTimeout("root.queueD"));
     assertEquals(-1, queueConf.getFairSharePreemptionTimeout("root.queueE"));
+
+    assertEquals(.6f, queueConf.getFairSharePreemptionThreshold("root"), 0.01);
+    assertEquals(-1, queueConf.getFairSharePreemptionThreshold("root."
+        + YarnConfiguration.DEFAULT_QUEUE_NAME), 0.01);
+    assertEquals(-1,
+        queueConf.getFairSharePreemptionThreshold("root.queueA"), 0.01);
+    assertEquals(-1,
+        queueConf.getFairSharePreemptionThreshold("root.queueB"), 0.01);
+    assertEquals(-1,
+        queueConf.getFairSharePreemptionThreshold("root.queueC"), 0.01);
+    assertEquals(-1,
+        queueConf.getFairSharePreemptionThreshold("root.queueD"), 0.01);
+    assertEquals(.3f,
+        queueConf.getFairSharePreemptionThreshold("root.queueE"), 0.01);
   }
   
   @Test

+ 170 - 28
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java

@@ -18,50 +18,66 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
+import java.io.File;
+import java.io.FileWriter;
 import java.io.IOException;
+import java.io.PrintWriter;
+import java.util.Collection;
 
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.event.AsyncDispatcher;
-import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
+import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
+import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
 import org.apache.hadoop.yarn.util.resource.Resources;
+import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.Mockito;
 
-public class TestFSLeafQueue {
-  private FSLeafQueue schedulable = null;
-  private Resource maxResource = Resources.createResource(10);
+public class TestFSLeafQueue extends FairSchedulerTestBase {
+  private final static String ALLOC_FILE = new File(TEST_DIR,
+      TestFSLeafQueue.class.getName() + ".xml").getAbsolutePath();
+  private Resource maxResource = Resources.createResource(1024 * 8);
 
   @Before
   public void setup() throws IOException {
-    FairScheduler scheduler = new FairScheduler();
-    Configuration conf = createConfiguration();
-    // All tests assume only one assignment per node update
-    conf.set(FairSchedulerConfiguration.ASSIGN_MULTIPLE, "false");
-    ResourceManager resourceManager = new ResourceManager();
-    resourceManager.init(conf);
-    ((AsyncDispatcher)resourceManager.getRMContext().getDispatcher()).start();
-    scheduler.init(conf);
-    scheduler.start();
-    scheduler.reinitialize(conf, resourceManager.getRMContext());
-    
-    String queueName = "root.queue1";
-    scheduler.allocConf = mock(AllocationConfiguration.class);
-    when(scheduler.allocConf.getMaxResources(queueName)).thenReturn(maxResource);
-    when(scheduler.allocConf.getMinResources(queueName)).thenReturn(Resources.none());
+    conf = createConfiguration();
+    conf.setClass(YarnConfiguration.RM_SCHEDULER, FairScheduler.class,
+        ResourceScheduler.class);
+  }
 
-    schedulable = new FSLeafQueue(queueName, scheduler, null);
+  @After
+  public void teardown() {
+    if (resourceManager != null) {
+      resourceManager.stop();
+      resourceManager = null;
+    }
+    conf = null;
   }
 
   @Test
   public void testUpdateDemand() {
+    conf.set(FairSchedulerConfiguration.ASSIGN_MULTIPLE, "false");
+    resourceManager = new MockRM(conf);
+    resourceManager.start();
+    scheduler = (FairScheduler) resourceManager.getResourceScheduler();
+    scheduler.allocConf = mock(AllocationConfiguration.class);
+
+    String queueName = "root.queue1";
+    when(scheduler.allocConf.getMaxResources(queueName)).thenReturn(maxResource);
+    when(scheduler.allocConf.getMinResources(queueName)).thenReturn(Resources.none());
+    FSLeafQueue schedulable = new FSLeafQueue(queueName, scheduler, null);
+
     FSAppAttempt app = mock(FSAppAttempt.class);
     Mockito.when(app.getDemand()).thenReturn(maxResource);
 
@@ -73,11 +89,137 @@ public class TestFSLeafQueue {
     assertTrue("Demand is greater than max allowed ",
         Resources.equals(schedulable.getDemand(), maxResource));
   }
-  
-  private Configuration createConfiguration() {
-    Configuration conf = new YarnConfiguration();
-    conf.setClass(YarnConfiguration.RM_SCHEDULER, FairScheduler.class,
-        ResourceScheduler.class);
-    return conf;
+
+  @Test (timeout = 5000)
+  public void test() throws Exception {
+    conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
+    PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
+    out.println("<?xml version=\"1.0\"?>");
+    out.println("<allocations>");
+    out.println("<queue name=\"queueA\">");
+    out.println("<minResources>2048mb,0vcores</minResources>");
+    out.println("</queue>");
+    out.println("<queue name=\"queueB\">");
+    out.println("<minResources>2048mb,0vcores</minResources>");
+    out.println("</queue>");
+    out.println("</allocations>");
+    out.close();
+
+    resourceManager = new MockRM(conf);
+    resourceManager.start();
+    scheduler = (FairScheduler) resourceManager.getResourceScheduler();
+
+    // Add one big node (only care about aggregate capacity)
+    RMNode node1 =
+        MockNodes.newNodeInfo(1, Resources.createResource(4 * 1024, 4), 1,
+            "127.0.0.1");
+    NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
+    scheduler.handle(nodeEvent1);
+
+    scheduler.update();
+
+    // Queue A wants 3 * 1024. Node update gives this all to A
+    createSchedulingRequest(3 * 1024, "queueA", "user1");
+    scheduler.update();
+    NodeUpdateSchedulerEvent nodeEvent2 = new NodeUpdateSchedulerEvent(node1);
+    scheduler.handle(nodeEvent2);
+
+    // Queue B arrives and wants 1 * 1024
+    createSchedulingRequest(1 * 1024, "queueB", "user1");
+    scheduler.update();
+    Collection<FSLeafQueue> queues = scheduler.getQueueManager().getLeafQueues();
+    assertEquals(3, queues.size());
+
+    // Queue A should be above min share, B below.
+    FSLeafQueue queueA =
+        scheduler.getQueueManager().getLeafQueue("queueA", false);
+    FSLeafQueue queueB =
+        scheduler.getQueueManager().getLeafQueue("queueB", false);
+    assertFalse(queueA.isStarvedForMinShare());
+    assertTrue(queueB.isStarvedForMinShare());
+
+    // Node checks in again, should allocate for B
+    scheduler.handle(nodeEvent2);
+    // Now B should have min share ( = demand here)
+    assertFalse(queueB.isStarvedForMinShare());
+  }
+
+  @Test (timeout = 5000)
+  public void testIsStarvedForFairShare() throws Exception {
+    conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
+    PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
+    out.println("<?xml version=\"1.0\"?>");
+    out.println("<allocations>");
+    out.println("<queue name=\"queueA\">");
+    out.println("<weight>.2</weight>");
+    out.println("</queue>");
+    out.println("<queue name=\"queueB\">");
+    out.println("<weight>.8</weight>");
+    out.println("<fairSharePreemptionThreshold>.4</fairSharePreemptionThreshold>");
+    out.println("<queue name=\"queueB1\">");
+    out.println("</queue>");
+    out.println("<queue name=\"queueB2\">");
+    out.println("<fairSharePreemptionThreshold>.6</fairSharePreemptionThreshold>");
+    out.println("</queue>");
+    out.println("</queue>");
+    out.println("<defaultFairSharePreemptionThreshold>.5</defaultFairSharePreemptionThreshold>");
+    out.println("</allocations>");
+    out.close();
+
+    resourceManager = new MockRM(conf);
+    resourceManager.start();
+    scheduler = (FairScheduler) resourceManager.getResourceScheduler();
+
+    // Add one big node (only care about aggregate capacity)
+    RMNode node1 =
+        MockNodes.newNodeInfo(1, Resources.createResource(10 * 1024, 10), 1,
+            "127.0.0.1");
+    NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
+    scheduler.handle(nodeEvent1);
+
+    scheduler.update();
+
+    // Queue A wants 4 * 1024. Node update gives this all to A
+    createSchedulingRequest(1 * 1024, "queueA", "user1", 4);
+    scheduler.update();
+    NodeUpdateSchedulerEvent nodeEvent2 = new NodeUpdateSchedulerEvent(node1);
+    for (int i = 0; i < 4; i ++) {
+      scheduler.handle(nodeEvent2);
+    }
+
+    QueueManager queueMgr = scheduler.getQueueManager();
+    FSLeafQueue queueA = queueMgr.getLeafQueue("queueA", false);
+    assertEquals(4 * 1024, queueA.getResourceUsage().getMemory());
+
+    // Both queue B1 and queue B2 want 3 * 1024
+    createSchedulingRequest(1 * 1024, "queueB.queueB1", "user1", 3);
+    createSchedulingRequest(1 * 1024, "queueB.queueB2", "user1", 3);
+    scheduler.update();
+    for (int i = 0; i < 4; i ++) {
+      scheduler.handle(nodeEvent2);
+    }
+
+    FSLeafQueue queueB1 = queueMgr.getLeafQueue("queueB.queueB1", false);
+    FSLeafQueue queueB2 = queueMgr.getLeafQueue("queueB.queueB2", false);
+    assertEquals(2 * 1024, queueB1.getResourceUsage().getMemory());
+    assertEquals(2 * 1024, queueB2.getResourceUsage().getMemory());
+
+    // For queue B1, the fairSharePreemptionThreshold is 0.4, and the fair share
+    // threshold is 1.6 * 1024
+    assertFalse(queueB1.isStarvedForFairShare());
+
+    // For queue B2, the fairSharePreemptionThreshold is 0.6, and the fair share
+    // threshold is 2.4 * 1024
+    assertTrue(queueB2.isStarvedForFairShare());
+
+    // Node checks in again
+    scheduler.handle(nodeEvent2);
+    scheduler.handle(nodeEvent2);
+    assertEquals(3 * 1024, queueB1.getResourceUsage().getMemory());
+    assertEquals(3 * 1024, queueB2.getResourceUsage().getMemory());
+
+    // Both queue B1 and queue B2 usages go to 3 * 1024
+    assertFalse(queueB1.isStarvedForFairShare());
+    assertFalse(queueB2.isStarvedForFairShare());
   }
 }

+ 34 - 124
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java

@@ -1061,9 +1061,11 @@ public class TestFairScheduler extends FairSchedulerTestBase {
     out.println("  </queue>");
     out.println("  <fairSharePreemptionTimeout>100</fairSharePreemptionTimeout>");
     out.println("  <minSharePreemptionTimeout>120</minSharePreemptionTimeout>");
+    out.println("  <fairSharePreemptionThreshold>.5</fairSharePreemptionThreshold>");
     out.println("</queue>");
     out.println("<defaultFairSharePreemptionTimeout>300</defaultFairSharePreemptionTimeout>");
     out.println("<defaultMinSharePreemptionTimeout>200</defaultMinSharePreemptionTimeout>");
+    out.println("<defaultFairSharePreemptionThreshold>.6</defaultFairSharePreemptionThreshold>");
     out.println("</allocations>");
     out.close();
 
@@ -1080,125 +1082,7 @@ public class TestFairScheduler extends FairSchedulerTestBase {
 
     assertEquals(100000, root.getFairSharePreemptionTimeout());
     assertEquals(120000, root.getMinSharePreemptionTimeout());
-  }
-  
-  @Test (timeout = 5000)
-  public void testIsStarvedForMinShare() throws Exception {
-    conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
-
-    PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
-    out.println("<?xml version=\"1.0\"?>");
-    out.println("<allocations>");
-    out.println("<queue name=\"queueA\">");
-    out.println("<minResources>2048mb,0vcores</minResources>");
-    out.println("</queue>");
-    out.println("<queue name=\"queueB\">");
-    out.println("<minResources>2048mb,0vcores</minResources>");
-    out.println("</queue>");
-    out.println("</allocations>");
-    out.close();
-
-    scheduler.init(conf);
-    scheduler.start();
-    scheduler.reinitialize(conf, resourceManager.getRMContext());
-
-    // Add one big node (only care about aggregate capacity)
-    RMNode node1 =
-        MockNodes.newNodeInfo(1, Resources.createResource(4 * 1024, 4), 1,
-            "127.0.0.1");
-    NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
-    scheduler.handle(nodeEvent1);
-
-    // Queue A wants 3 * 1024. Node update gives this all to A
-    createSchedulingRequest(3 * 1024, "queueA", "user1");
-    scheduler.update();
-    NodeUpdateSchedulerEvent nodeEvent2 = new NodeUpdateSchedulerEvent(node1);
-    scheduler.handle(nodeEvent2);
-
-    // Queue B arrives and wants 1 * 1024
-    createSchedulingRequest(1 * 1024, "queueB", "user1");
-    scheduler.update();
-    Collection<FSLeafQueue> queues = scheduler.getQueueManager().getLeafQueues();
-    assertEquals(3, queues.size());
-
-    // Queue A should be above min share, B below.
-    for (FSLeafQueue p : queues) {
-      if (p.getName().equals("root.queueA")) {
-        assertEquals(false, scheduler.isStarvedForMinShare(p));
-      }
-      else if (p.getName().equals("root.queueB")) {
-        assertEquals(true, scheduler.isStarvedForMinShare(p));
-      }
-    }
-
-    // Node checks in again, should allocate for B
-    scheduler.handle(nodeEvent2);
-    // Now B should have min share ( = demand here)
-    for (FSLeafQueue p : queues) {
-      if (p.getName().equals("root.queueB")) {
-        assertEquals(false, scheduler.isStarvedForMinShare(p));
-      }
-    }
-  }
-
-  @Test (timeout = 5000)
-  public void testIsStarvedForFairShare() throws Exception {
-    conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
-
-    PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
-    out.println("<?xml version=\"1.0\"?>");
-    out.println("<allocations>");
-    out.println("<queue name=\"queueA\">");
-    out.println("<weight>.25</weight>");
-    out.println("</queue>");
-    out.println("<queue name=\"queueB\">");
-    out.println("<weight>.75</weight>");
-    out.println("</queue>");
-    out.println("</allocations>");
-    out.close();
-
-    scheduler.init(conf);
-    scheduler.start();
-    scheduler.reinitialize(conf, resourceManager.getRMContext());
-
-    // Add one big node (only care about aggregate capacity)
-    RMNode node1 =
-        MockNodes.newNodeInfo(1, Resources.createResource(4 * 1024, 4), 1,
-            "127.0.0.1");
-    NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
-    scheduler.handle(nodeEvent1);
-
-    // Queue A wants 3 * 1024. Node update gives this all to A
-    createSchedulingRequest(3 * 1024, "queueA", "user1");
-    scheduler.update();
-    NodeUpdateSchedulerEvent nodeEvent2 = new NodeUpdateSchedulerEvent(node1);
-    scheduler.handle(nodeEvent2);
-
-    // Queue B arrives and wants 1 * 1024
-    createSchedulingRequest(1 * 1024, "queueB", "user1");
-    scheduler.update();
-    Collection<FSLeafQueue> queues = scheduler.getQueueManager().getLeafQueues();
-    assertEquals(3, queues.size());
-
-    // Queue A should be above fair share, B below.
-    for (FSLeafQueue p : queues) {
-      if (p.getName().equals("root.queueA")) {
-        assertEquals(false, scheduler.isStarvedForFairShare(p));
-      }
-      else if (p.getName().equals("root.queueB")) {
-        assertEquals(true, scheduler.isStarvedForFairShare(p));
-      }
-    }
-
-    // Node checks in again, should allocate for B
-    scheduler.handle(nodeEvent2);
-    // B should not be starved for fair share, since entire demand is
-    // satisfied.
-    for (FSLeafQueue p : queues) {
-      if (p.getName().equals("root.queueB")) {
-        assertEquals(false, scheduler.isStarvedForFairShare(p));
-      }
-    }
+    assertEquals(0.5f, root.getFairSharePreemptionThreshold(), 0.01);
   }
 
   @Test (timeout = 5000)
@@ -1385,7 +1269,8 @@ public class TestFairScheduler extends FairSchedulerTestBase {
     out.println("<queue name=\"queueB\">");
     out.println("<weight>2</weight>");
     out.println("</queue>");
-    out.print("<defaultFairSharePreemptionTimeout>10</defaultFairSharePreemptionTimeout>");
+    out.println("<defaultFairSharePreemptionTimeout>10</defaultFairSharePreemptionTimeout>");
+    out.println("<defaultFairSharePreemptionThreshold>.5</defaultFairSharePreemptionThreshold>");
     out.println("</allocations>");
     out.close();
 
@@ -1468,8 +1353,9 @@ public class TestFairScheduler extends FairSchedulerTestBase {
     out.println("<weight>.25</weight>");
     out.println("<minResources>1024mb,0vcores</minResources>");
     out.println("</queue>");
-    out.print("<defaultMinSharePreemptionTimeout>5</defaultMinSharePreemptionTimeout>");
-    out.print("<defaultFairSharePreemptionTimeout>10</defaultFairSharePreemptionTimeout>");
+    out.println("<defaultMinSharePreemptionTimeout>5</defaultMinSharePreemptionTimeout>");
+    out.println("<defaultFairSharePreemptionTimeout>10</defaultFairSharePreemptionTimeout>");
+    out.println("<defaultFairSharePreemptionThreshold>.5</defaultFairSharePreemptionThreshold>");
     out.println("</allocations>");
     out.close();
 
@@ -1753,8 +1639,6 @@ public class TestFairScheduler extends FairSchedulerTestBase {
   @Test
   public void testBackwardsCompatiblePreemptionConfiguration() throws Exception {
     conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
-    MockClock clock = new MockClock();
-    scheduler.setClock(clock);
 
     PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
     out.println("<?xml version=\"1.0\"?>");
@@ -1842,6 +1726,32 @@ public class TestFairScheduler extends FairSchedulerTestBase {
         .getFairSharePreemptionTimeout());
   }
 
+  @Test
+  public void testPreemptionVariablesForQueueCreatedRuntime() throws Exception {
+    conf.set(FairSchedulerConfiguration.USER_AS_DEFAULT_QUEUE, "true");
+    scheduler.init(conf);
+    scheduler.start();
+    scheduler.reinitialize(conf, resourceManager.getRMContext());
+
+    // Set preemption variables for the root queue
+    FSParentQueue root = scheduler.getQueueManager().getRootQueue();
+    root.setMinSharePreemptionTimeout(10000);
+    root.setFairSharePreemptionTimeout(15000);
+    root.setFairSharePreemptionThreshold(.6f);
+
+    // User1 submits one application
+    ApplicationAttemptId appAttemptId = createAppAttemptId(1, 1);
+    createApplicationWithAMResource(appAttemptId, "default", "user1", null);
+
+    // The user1 queue should inherit the configurations from the root queue
+    FSLeafQueue userQueue =
+        scheduler.getQueueManager().getLeafQueue("user1", true);
+    assertEquals(1, userQueue.getRunnableAppSchedulables().size());
+    assertEquals(10000, userQueue.getMinSharePreemptionTimeout());
+    assertEquals(15000, userQueue.getFairSharePreemptionTimeout());
+    assertEquals(.6f, userQueue.getFairSharePreemptionThreshold(), 0.001);
+  }
+
   @Test (timeout = 5000)
   public void testMultipleContainersWaitingForReservation() throws IOException {
     scheduler.init(conf);

+ 10 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/FairScheduler.apt.vm

@@ -277,6 +277,12 @@ Allocation file format
      threshold before it will try to preempt containers to take resources from other
      queues. If not set, the queue will inherit the value from its parent queue.
 
+   * fairSharePreemptionThreshold: the fair share preemption threshold for the
+     queue. If the queue waits fairSharePreemptionTimeout without receiving
+     fairSharePreemptionThreshold*fairShare resources, it is allowed to preempt
+     containers to take resources from other queues. If not set, the queue will
+     inherit the value from its parent queue.
+
  * <<User elements>>, which represent settings governing the behavior of individual 
      users. They can contain a single property: maxRunningApps, a limit on the 
      number of running apps for a particular user.
@@ -292,6 +298,10 @@ Allocation file format
    preemption timeout for the root queue; overridden by minSharePreemptionTimeout
    element in root queue.
 
+ * <<A defaultFairSharePreemptionThreshold element>>, which sets the fair share
+   preemption threshold for the root queue; overridden by fairSharePreemptionThreshold
+   element in root queue.
+
  * <<A queueMaxAppsDefault element>>, which sets the default running app limit
    for queues; overriden by maxRunningApps element in each queue.