Explorar el Código

YARN-6163. FS Preemption is a trickle for severely starved applications. (kasha)

(cherry picked from commit 6c25dbcdc0517a825b92fb16444aa1d3761e160c)
Karthik Kambatla hace 8 años
padre
commit
46b6c95e0a
Se han modificado 12 ficheros con 584 adiciones y 139 borrados
  1. 18 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/Resources.java
  2. 4 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
  3. 98 12
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
  4. 77 34
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java
  5. 65 60
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSPreemptionThread.java
  6. 4 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
  7. 18 5
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java
  8. 146 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/VisitedResourceRequestTracker.java
  9. 4 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerWithMockPreemption.java
  10. 15 5
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSAppStarvation.java
  11. 23 22
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java
  12. 112 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestVisitedResourceRequestTracker.java

+ 18 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/Resources.java

@@ -182,6 +182,24 @@ public class Resources {
     return subtractFrom(clone(lhs), rhs);
     return subtractFrom(clone(lhs), rhs);
   }
   }
 
 
+  /**
+   * Subtract <code>rhs</code> from <code>lhs</code> and reset any negative
+   * values to zero.
+   * @param lhs {@link Resource} to subtract from
+   * @param rhs {@link Resource} to subtract
+   * @return the value of lhs after subtraction
+   */
+  public static Resource subtractFromNonNegative(Resource lhs, Resource rhs) {
+    subtractFrom(lhs, rhs);
+    if (lhs.getMemorySize() < 0) {
+      lhs.setMemorySize(0);
+    }
+    if (lhs.getVirtualCores() < 0) {
+      lhs.setVirtualCores(0);
+    }
+    return lhs;
+  }
+
   public static Resource negate(Resource resource) {
   public static Resource negate(Resource resource) {
     return subtract(NONE, resource);
     return subtract(NONE, resource);
   }
   }

+ 4 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java

@@ -120,6 +120,7 @@ public abstract class AbstractYarnScheduler
    */
    */
   protected ConcurrentMap<ApplicationId, SchedulerApplication<T>> applications;
   protected ConcurrentMap<ApplicationId, SchedulerApplication<T>> applications;
   protected int nmExpireInterval;
   protected int nmExpireInterval;
+  protected long nmHeartbeatInterval;
 
 
   protected final static List<Container> EMPTY_CONTAINER_LIST =
   protected final static List<Container> EMPTY_CONTAINER_LIST =
       new ArrayList<Container>();
       new ArrayList<Container>();
@@ -156,6 +157,9 @@ public abstract class AbstractYarnScheduler
     nmExpireInterval =
     nmExpireInterval =
         conf.getInt(YarnConfiguration.RM_NM_EXPIRY_INTERVAL_MS,
         conf.getInt(YarnConfiguration.RM_NM_EXPIRY_INTERVAL_MS,
           YarnConfiguration.DEFAULT_RM_NM_EXPIRY_INTERVAL_MS);
           YarnConfiguration.DEFAULT_RM_NM_EXPIRY_INTERVAL_MS);
+    nmHeartbeatInterval =
+        conf.getLong(YarnConfiguration.RM_NM_HEARTBEAT_INTERVAL_MS,
+            YarnConfiguration.DEFAULT_RM_NM_HEARTBEAT_INTERVAL_MS);
     long configuredMaximumAllocationWaitTime =
     long configuredMaximumAllocationWaitTime =
         conf.getLong(YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_SCHEDULING_WAIT_MS,
         conf.getLong(YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_SCHEDULING_WAIT_MS,
           YarnConfiguration.DEFAULT_RM_WORK_PRESERVING_RECOVERY_SCHEDULING_WAIT_MS);
           YarnConfiguration.DEFAULT_RM_WORK_PRESERVING_RECOVERY_SCHEDULING_WAIT_MS);

+ 98 - 12
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java

@@ -86,6 +86,7 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
   private final Set<RMContainer> containersToPreempt = new HashSet<>();
   private final Set<RMContainer> containersToPreempt = new HashSet<>();
   private Resource fairshareStarvation = Resources.none();
   private Resource fairshareStarvation = Resources.none();
   private long lastTimeAtFairShare;
   private long lastTimeAtFairShare;
+  private long nextStarvationCheck;
 
 
   // minShareStarvation attributed to this application by the leaf queue
   // minShareStarvation attributed to this application by the leaf queue
   private Resource minshareStarvation = Resources.none();
   private Resource minshareStarvation = Resources.none();
@@ -210,15 +211,9 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
       blacklistNodeIds.addAll(scheduler.getBlacklistedNodes(this));
       blacklistNodeIds.addAll(scheduler.getBlacklistedNodes(this));
     }
     }
     for (FSSchedulerNode node: blacklistNodeIds) {
     for (FSSchedulerNode node: blacklistNodeIds) {
-      Resources.subtractFrom(availableResources,
+      Resources.subtractFromNonNegative(availableResources,
           node.getUnallocatedResource());
           node.getUnallocatedResource());
     }
     }
-    if (availableResources.getMemorySize() < 0) {
-      availableResources.setMemorySize(0);
-    }
-    if (availableResources.getVirtualCores() < 0) {
-      availableResources.setVirtualCores(0);
-    }
   }
   }
 
 
   /**
   /**
@@ -528,6 +523,15 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
     return Resources.add(fairshareStarvation, minshareStarvation);
     return Resources.add(fairshareStarvation, minshareStarvation);
   }
   }
 
 
+  /**
+   * Get last computed fairshare starvation.
+   *
+   * @return last computed fairshare starvation
+   */
+  Resource getFairshareStarvation() {
+    return fairshareStarvation;
+  }
+
   /**
   /**
    * Set the minshare attributed to this application. To be called only from
    * Set the minshare attributed to this application. To be called only from
    * {@link FSLeafQueue#updateStarvedApps}.
    * {@link FSLeafQueue#updateStarvedApps}.
@@ -1066,17 +1070,17 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
   }
   }
 
 
   /**
   /**
-   * Helper method that computes the extent of fairshare fairshareStarvation.
+   * Helper method that computes the extent of fairshare starvation.
+   * @return freshly computed fairshare starvation
    */
    */
   Resource fairShareStarvation() {
   Resource fairShareStarvation() {
     Resource threshold = Resources.multiply(
     Resource threshold = Resources.multiply(
         getFairShare(), fsQueue.getFairSharePreemptionThreshold());
         getFairShare(), fsQueue.getFairSharePreemptionThreshold());
-    Resource starvation = Resources.subtractFrom(threshold, getResourceUsage());
+    Resource starvation = Resources.componentwiseMin(threshold, demand);
+    Resources.subtractFromNonNegative(starvation, getResourceUsage());
 
 
     long now = scheduler.getClock().getTime();
     long now = scheduler.getClock().getTime();
-    boolean starved = Resources.greaterThan(
-        fsQueue.getPolicy().getResourceCalculator(),
-        scheduler.getClusterResource(), starvation, Resources.none());
+    boolean starved = !Resources.isNone(starvation);
 
 
     if (!starved) {
     if (!starved) {
       lastTimeAtFairShare = now;
       lastTimeAtFairShare = now;
@@ -1104,6 +1108,81 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
     return !Resources.isNone(fairshareStarvation);
     return !Resources.isNone(fairshareStarvation);
   }
   }
 
 
+  /**
+   * Fetch a list of RRs corresponding to the extent the app is starved
+   * (fairshare and minshare). This method considers the number of containers
+   * in a RR and also only one locality-level (the first encountered
+   * resourceName).
+   *
+   * @return list of {@link ResourceRequest}s corresponding to the amount of
+   * starvation.
+   */
+  List<ResourceRequest> getStarvedResourceRequests() {
+    // List of RRs we build in this method to return
+    List<ResourceRequest> ret = new ArrayList<>();
+
+    // Track visited RRs to avoid the same RR at multiple locality levels
+    VisitedResourceRequestTracker visitedRRs =
+        new VisitedResourceRequestTracker(scheduler.getNodeTracker());
+
+    // Start with current starvation and track the pending amount
+    Resource pending = getStarvation();
+    for (ResourceRequest rr : appSchedulingInfo.getAllResourceRequests()) {
+      if (Resources.isNone(pending)) {
+        // Found enough RRs to match the starvation
+        break;
+      }
+
+      // See if we have already seen this RR
+      if (!visitedRRs.visit(rr)) {
+        continue;
+      }
+
+      // A RR can have multiple containers of a capability. We need to
+      // compute the number of containers that fit in "pending".
+      int numContainersThatFit = (int) Math.floor(
+          Resources.ratio(scheduler.getResourceCalculator(),
+              pending, rr.getCapability()));
+      if (numContainersThatFit == 0) {
+        // This RR's capability is too large to fit in pending
+        continue;
+      }
+
+      // If the RR is only partially being satisfied, include only the
+      // partial number of containers.
+      if (numContainersThatFit < rr.getNumContainers()) {
+        rr = ResourceRequest.newInstance(rr.getPriority(),
+            rr.getResourceName(), rr.getCapability(), numContainersThatFit);
+      }
+
+      // Add the RR to return list and adjust "pending" accordingly
+      ret.add(rr);
+      Resources.subtractFromNonNegative(pending,
+          Resources.multiply(rr.getCapability(), rr.getNumContainers()));
+    }
+
+    return ret;
+  }
+
+  /**
+   * Notify this app that preemption has been triggered to make room for
+   * outstanding demand. The app should not be considered starved until after
+   * the specified delay.
+   *
+   * @param delayBeforeNextStarvationCheck duration to wait
+   */
+  void preemptionTriggered(long delayBeforeNextStarvationCheck) {
+    nextStarvationCheck =
+        scheduler.getClock().getTime() + delayBeforeNextStarvationCheck;
+  }
+
+  /**
+   * Whether this app's starvation should be considered.
+   */
+  boolean shouldCheckForStarvation() {
+    return scheduler.getClock().getTime() >= nextStarvationCheck;
+  }
+
   /* Schedulable methods implementation */
   /* Schedulable methods implementation */
 
 
   @Override
   @Override
@@ -1116,6 +1195,13 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
     return demand;
     return demand;
   }
   }
 
 
+  /**
+   * Get the current app's unsatisfied demand.
+   */
+  Resource getPendingDemand() {
+    return Resources.subtract(demand, getResourceUsage());
+  }
+
   @Override
   @Override
   public long getStartTime() {
   public long getStartTime() {
     return startTime;
     return startTime;

+ 77 - 34
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java

@@ -220,54 +220,53 @@ public class FSLeafQueue extends FSQueue {
   }
   }
 
 
   /**
   /**
-   * Helper method to identify starved applications. This needs to be called
-   * ONLY from {@link #updateInternal}, after the application shares
-   * are updated.
-   *
-   * A queue can be starving due to fairshare or minshare.
-   *
-   * Minshare is defined only on the queue and not the applications.
-   * Fairshare is defined for both the queue and the applications.
-   *
-   * If this queue is starved due to minshare, we need to identify the most
-   * deserving apps if they themselves are not starved due to fairshare.
+   * Compute the extent of fairshare starvation for a set of apps.
    *
    *
-   * If this queue is starving due to fairshare, there must be at least
-   * one application that is starved. And, even if the queue is not
-   * starved due to fairshare, there might still be starved applications.
+   * @param appsWithDemand apps to compute fairshare starvation for
+   * @return aggregate fairshare starvation for all apps
    */
    */
-  private void updateStarvedApps() {
-    // First identify starved applications and track total amount of
-    // starvation (in resources)
+  private Resource updateStarvedAppsFairshare(
+      TreeSet<FSAppAttempt> appsWithDemand) {
     Resource fairShareStarvation = Resources.clone(none());
     Resource fairShareStarvation = Resources.clone(none());
-
     // Fetch apps with unmet demand sorted by fairshare starvation
     // Fetch apps with unmet demand sorted by fairshare starvation
-    TreeSet<FSAppAttempt> appsWithDemand = fetchAppsWithDemand();
     for (FSAppAttempt app : appsWithDemand) {
     for (FSAppAttempt app : appsWithDemand) {
       Resource appStarvation = app.fairShareStarvation();
       Resource appStarvation = app.fairShareStarvation();
-      if (!Resources.equals(Resources.none(), appStarvation))  {
+      if (!Resources.isNone(appStarvation))  {
         context.getStarvedApps().addStarvedApp(app);
         context.getStarvedApps().addStarvedApp(app);
         Resources.addTo(fairShareStarvation, appStarvation);
         Resources.addTo(fairShareStarvation, appStarvation);
       } else {
       } else {
         break;
         break;
       }
       }
     }
     }
+    return fairShareStarvation;
+  }
 
 
-    // Compute extent of minshare starvation
-    Resource minShareStarvation = minShareStarvation();
-
-    // Compute minshare starvation that is not subsumed by fairshare starvation
-    Resources.subtractFrom(minShareStarvation, fairShareStarvation);
+  /**
+   * Distribute minshare starvation to a set of apps
+   * @param appsWithDemand set of apps
+   * @param minShareStarvation minshare starvation to distribute
+   */
+  private void updateStarvedAppsMinshare(
+      final TreeSet<FSAppAttempt> appsWithDemand,
+      final Resource minShareStarvation) {
+    Resource pending = Resources.clone(minShareStarvation);
 
 
     // Keep adding apps to the starved list until the unmet demand goes over
     // Keep adding apps to the starved list until the unmet demand goes over
     // the remaining minshare
     // the remaining minshare
     for (FSAppAttempt app : appsWithDemand) {
     for (FSAppAttempt app : appsWithDemand) {
-      if (Resources.greaterThan(policy.getResourceCalculator(),
-          scheduler.getClusterResource(), minShareStarvation, none())) {
-        Resource appPendingDemand =
-            Resources.subtract(app.getDemand(), app.getResourceUsage());
-        Resources.subtractFrom(minShareStarvation, appPendingDemand);
-        app.setMinshareStarvation(appPendingDemand);
+      if (!Resources.isNone(pending)) {
+        Resource appMinShare = app.getPendingDemand();
+        Resources.subtractFromNonNegative(
+            appMinShare, app.getFairshareStarvation());
+
+        if (Resources.greaterThan(policy.getResourceCalculator(),
+            scheduler.getClusterResource(), appMinShare, pending)) {
+          Resources.subtractFromNonNegative(appMinShare, pending);
+          pending = none();
+        } else {
+          Resources.subtractFromNonNegative(pending, appMinShare);
+        }
+        app.setMinshareStarvation(appMinShare);
         context.getStarvedApps().addStarvedApp(app);
         context.getStarvedApps().addStarvedApp(app);
       } else {
       } else {
         // Reset minshare starvation in case we had set it in a previous
         // Reset minshare starvation in case we had set it in a previous
@@ -277,6 +276,40 @@ public class FSLeafQueue extends FSQueue {
     }
     }
   }
   }
 
 
+  /**
+   * Helper method to identify starved applications. This needs to be called
+   * ONLY from {@link #updateInternal}, after the application shares
+   * are updated.
+   *
+   * A queue can be starving due to fairshare or minshare.
+   *
+   * Minshare is defined only on the queue and not the applications.
+   * Fairshare is defined for both the queue and the applications.
+   *
+   * If this queue is starved due to minshare, we need to identify the most
+   * deserving apps if they themselves are not starved due to fairshare.
+   *
+   * If this queue is starving due to fairshare, there must be at least
+   * one application that is starved. And, even if the queue is not
+   * starved due to fairshare, there might still be starved applications.
+   */
+  private void updateStarvedApps() {
+    // Fetch apps with pending demand
+    TreeSet<FSAppAttempt> appsWithDemand = fetchAppsWithDemand(false);
+
+    // Process apps with fairshare starvation
+    Resource fairShareStarvation = updateStarvedAppsFairshare(appsWithDemand);
+
+    // Compute extent of minshare starvation
+    Resource minShareStarvation = minShareStarvation();
+
+    // Compute minshare starvation that is not subsumed by fairshare starvation
+    Resources.subtractFromNonNegative(minShareStarvation, fairShareStarvation);
+
+    // Assign this minshare to apps with pending demand over fairshare
+    updateStarvedAppsMinshare(appsWithDemand, minShareStarvation);
+  }
+
   @Override
   @Override
   public Resource getDemand() {
   public Resource getDemand() {
     return demand;
     return demand;
@@ -352,7 +385,7 @@ public class FSLeafQueue extends FSQueue {
       return assigned;
       return assigned;
     }
     }
 
 
-    for (FSAppAttempt sched : fetchAppsWithDemand()) {
+    for (FSAppAttempt sched : fetchAppsWithDemand(true)) {
       if (SchedulerAppUtils.isPlaceBlacklisted(sched, node, LOG)) {
       if (SchedulerAppUtils.isPlaceBlacklisted(sched, node, LOG)) {
         continue;
         continue;
       }
       }
@@ -368,14 +401,24 @@ public class FSLeafQueue extends FSQueue {
     return assigned;
     return assigned;
   }
   }
 
 
-  private TreeSet<FSAppAttempt> fetchAppsWithDemand() {
+  /**
+   * Fetch the subset of apps that have unmet demand. When used for
+   * preemption-related code (as opposed to allocation), omits apps that
+   * should not be checked for starvation.
+   *
+   * @param assignment whether the apps are for allocation containers, as
+   *                   opposed to preemption calculations
+   * @return Set of apps with unmet demand
+   */
+  private TreeSet<FSAppAttempt> fetchAppsWithDemand(boolean assignment) {
     TreeSet<FSAppAttempt> pendingForResourceApps =
     TreeSet<FSAppAttempt> pendingForResourceApps =
         new TreeSet<>(policy.getComparator());
         new TreeSet<>(policy.getComparator());
     readLock.lock();
     readLock.lock();
     try {
     try {
       for (FSAppAttempt app : runnableApps) {
       for (FSAppAttempt app : runnableApps) {
         Resource pending = app.getAppAttemptResourceUsage().getPending();
         Resource pending = app.getAppAttemptResourceUsage().getPending();
-        if (!pending.equals(none())) {
+        if (!Resources.isNone(pending) &&
+            (assignment || app.shouldCheckForStarvation())) {
           pendingForResourceApps.add(app);
           pendingForResourceApps.add(app);
         }
         }
       }
       }

+ 65 - 60
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSPreemptionThread.java

@@ -41,20 +41,26 @@ class FSPreemptionThread extends Thread {
   protected final FSContext context;
   protected final FSContext context;
   private final FairScheduler scheduler;
   private final FairScheduler scheduler;
   private final long warnTimeBeforeKill;
   private final long warnTimeBeforeKill;
+  private final long delayBeforeNextStarvationCheck;
   private final Timer preemptionTimer;
   private final Timer preemptionTimer;
 
 
   FSPreemptionThread(FairScheduler scheduler) {
   FSPreemptionThread(FairScheduler scheduler) {
+    setDaemon(true);
+    setName("FSPreemptionThread");
     this.scheduler = scheduler;
     this.scheduler = scheduler;
     this.context = scheduler.getContext();
     this.context = scheduler.getContext();
     FairSchedulerConfiguration fsConf = scheduler.getConf();
     FairSchedulerConfiguration fsConf = scheduler.getConf();
     context.setPreemptionEnabled();
     context.setPreemptionEnabled();
     context.setPreemptionUtilizationThreshold(
     context.setPreemptionUtilizationThreshold(
         fsConf.getPreemptionUtilizationThreshold());
         fsConf.getPreemptionUtilizationThreshold());
-    warnTimeBeforeKill = fsConf.getWaitTimeBeforeKill();
     preemptionTimer = new Timer("Preemption Timer", true);
     preemptionTimer = new Timer("Preemption Timer", true);
 
 
-    setDaemon(true);
-    setName("FSPreemptionThread");
+    warnTimeBeforeKill = fsConf.getWaitTimeBeforeKill();
+    long allocDelay = (fsConf.isContinuousSchedulingEnabled()
+        ? 10 * fsConf.getContinuousSchedulingSleepMs() // 10 runs
+        : 4 * scheduler.getNMHeartbeatInterval()); // 4 heartbeats
+    delayBeforeNextStarvationCheck = warnTimeBeforeKill + allocDelay +
+        fsConf.getWaitTimeBeforeNextStarvationCheck();
   }
   }
 
 
   public void run() {
   public void run() {
@@ -62,13 +68,8 @@ class FSPreemptionThread extends Thread {
       FSAppAttempt starvedApp;
       FSAppAttempt starvedApp;
       try{
       try{
         starvedApp = context.getStarvedApps().take();
         starvedApp = context.getStarvedApps().take();
-        if (!Resources.isNone(starvedApp.getStarvation())) {
-          PreemptableContainers containers =
-              identifyContainersToPreempt(starvedApp);
-          if (containers != null) {
-            preemptContainers(containers.containers);
-          }
-        }
+        preemptContainers(identifyContainersToPreempt(starvedApp));
+        starvedApp.preemptionTriggered(delayBeforeNextStarvationCheck);
       } catch (InterruptedException e) {
       } catch (InterruptedException e) {
         LOG.info("Preemption thread interrupted! Exiting.");
         LOG.info("Preemption thread interrupted! Exiting.");
         return;
         return;
@@ -77,55 +78,57 @@ class FSPreemptionThread extends Thread {
   }
   }
 
 
   /**
   /**
-   * Given an app, identify containers to preempt to satisfy the app's next
-   * resource request.
+   * Given an app, identify containers to preempt to satisfy the app's
+   * starvation.
+   *
+   * Mechanics:
+   * 1. Fetch all {@link ResourceRequest}s corresponding to the amount of
+   * starvation.
+   * 2. For each {@link ResourceRequest}, iterate through matching
+   * nodes and identify containers to preempt all on one node, also
+   * optimizing for least number of AM container preemptions.
    *
    *
    * @param starvedApp starved application for which we are identifying
    * @param starvedApp starved application for which we are identifying
    *                   preemption targets
    *                   preemption targets
-   * @return list of containers to preempt to satisfy starvedApp, null if the
-   * app cannot be satisfied by preempting any running containers
+   * @return list of containers to preempt to satisfy starvedApp
    */
    */
-  private PreemptableContainers identifyContainersToPreempt(
+  private List<RMContainer> identifyContainersToPreempt(
       FSAppAttempt starvedApp) {
       FSAppAttempt starvedApp) {
-    PreemptableContainers bestContainers = null;
+    List<RMContainer> containersToPreempt = new ArrayList<>();
 
 
-    // Find the nodes that match the next resource request
-    ResourceRequest request = starvedApp.getNextResourceRequest();
-    // TODO (KK): Should we check other resource requests if we can't match
-    // the first one?
-
-    Resource requestCapability = request.getCapability();
-    List<FSSchedulerNode> potentialNodes =
-        scheduler.getNodeTracker().getNodesByResourceName(
-            request.getResourceName());
+    // Iterate through enough RRs to address app's starvation
+    for (ResourceRequest rr : starvedApp.getStarvedResourceRequests()) {
+      for (int i = 0; i < rr.getNumContainers(); i++) {
+        PreemptableContainers bestContainers = null;
+        List<FSSchedulerNode> potentialNodes = scheduler.getNodeTracker()
+            .getNodesByResourceName(rr.getResourceName());
+        for (FSSchedulerNode node : potentialNodes) {
+          // TODO (YARN-5829): Attempt to reserve the node for starved app.
+          if (isNodeAlreadyReserved(node, starvedApp)) {
+            continue;
+          }
 
 
-    // From the potential nodes, pick a node that has enough containers
-    // from apps over their fairshare
-    for (FSSchedulerNode node : potentialNodes) {
-      // TODO (YARN-5829): Attempt to reserve the node for starved app. The
-      // subsequent if-check needs to be reworked accordingly.
-      FSAppAttempt nodeReservedApp = node.getReservedAppSchedulable();
-      if (nodeReservedApp != null && !nodeReservedApp.equals(starvedApp)) {
-        // This node is already reserved by another app. Let us not consider
-        // this for preemption.
-        continue;
-      }
+          int maxAMContainers = bestContainers == null ?
+              Integer.MAX_VALUE : bestContainers.numAMContainers;
+          PreemptableContainers preemptableContainers =
+              identifyContainersToPreemptOnNode(
+                  rr.getCapability(), node, maxAMContainers);
+          if (preemptableContainers != null) {
+            // This set is better than any previously identified set.
+            bestContainers = preemptableContainers;
+            if (preemptableContainers.numAMContainers == 0) {
+              break;
+            }
+          }
+        } // End of iteration through nodes for one RR
 
 
-      int maxAMContainers = bestContainers == null ?
-          Integer.MAX_VALUE : bestContainers.numAMContainers;
-      PreemptableContainers preemptableContainers =
-          identifyContainersToPreemptOnNode(requestCapability, node,
-              maxAMContainers);
-      if (preemptableContainers != null) {
-        if (preemptableContainers.numAMContainers == 0) {
-          return preemptableContainers;
-        } else {
-          bestContainers = preemptableContainers;
+        if (bestContainers != null && bestContainers.containers.size() > 0) {
+          containersToPreempt.addAll(bestContainers.containers);
+          trackPreemptionsAgainstNode(bestContainers.containers);
         }
         }
       }
       }
-    }
-
-    return bestContainers;
+    } // End of iteration over RRs
+    return containersToPreempt;
   }
   }
 
 
   /**
   /**
@@ -176,23 +179,25 @@ class FSPreemptionThread extends Thread {
     return null;
     return null;
   }
   }
 
 
-  private void preemptContainers(List<RMContainer> containers) {
-    // Mark the containers as being considered for preemption on the node.
-    // Make sure the containers are subsequently removed by calling
-    // FSSchedulerNode#removeContainerForPreemption.
-    if (containers.size() > 0) {
-      FSSchedulerNode node = (FSSchedulerNode) scheduler.getNodeTracker()
-          .getNode(containers.get(0).getNodeId());
-      node.addContainersForPreemption(containers);
-    }
+  private boolean isNodeAlreadyReserved(
+      FSSchedulerNode node, FSAppAttempt app) {
+    FSAppAttempt nodeReservedApp = node.getReservedAppSchedulable();
+    return nodeReservedApp != null && !nodeReservedApp.equals(app);
+  }
 
 
+  private void trackPreemptionsAgainstNode(List<RMContainer> containers) {
+    FSSchedulerNode node = (FSSchedulerNode) scheduler.getNodeTracker()
+        .getNode(containers.get(0).getNodeId());
+    node.addContainersForPreemption(containers);
+  }
+
+  private void preemptContainers(List<RMContainer> containers) {
     // Warn application about containers to be killed
     // Warn application about containers to be killed
     for (RMContainer container : containers) {
     for (RMContainer container : containers) {
       ApplicationAttemptId appAttemptId = container.getApplicationAttemptId();
       ApplicationAttemptId appAttemptId = container.getApplicationAttemptId();
       FSAppAttempt app = scheduler.getSchedulerApp(appAttemptId);
       FSAppAttempt app = scheduler.getSchedulerApp(appAttemptId);
-      FSLeafQueue queue = app.getQueue();
       LOG.info("Preempting container " + container +
       LOG.info("Preempting container " + container +
-          " from queue " + queue.getName());
+          " from queue " + app.getQueueName());
       app.trackContainerForPreemption(container);
       app.trackContainerForPreemption(container);
     }
     }
 
 

+ 4 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java

@@ -1772,4 +1772,8 @@ public class FairScheduler extends
   public float getReservableNodesRatio() {
   public float getReservableNodesRatio() {
     return reservableNodesRatio;
     return reservableNodesRatio;
   }
   }
+
+  long getNMHeartbeatInterval() {
+    return nmHeartbeatInterval;
+  }
 }
 }

+ 18 - 5
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java

@@ -114,12 +114,24 @@ public class FairSchedulerConfiguration extends Configuration {
   protected static final String PREEMPTION_THRESHOLD =
   protected static final String PREEMPTION_THRESHOLD =
       CONF_PREFIX + "preemption.cluster-utilization-threshold";
       CONF_PREFIX + "preemption.cluster-utilization-threshold";
   protected static final float DEFAULT_PREEMPTION_THRESHOLD = 0.8f;
   protected static final float DEFAULT_PREEMPTION_THRESHOLD = 0.8f;
-  
-  protected static final String PREEMPTION_INTERVAL = CONF_PREFIX + "preemptionInterval";
-  protected static final int DEFAULT_PREEMPTION_INTERVAL = 5000;
+
   protected static final String WAIT_TIME_BEFORE_KILL = CONF_PREFIX + "waitTimeBeforeKill";
   protected static final String WAIT_TIME_BEFORE_KILL = CONF_PREFIX + "waitTimeBeforeKill";
   protected static final int DEFAULT_WAIT_TIME_BEFORE_KILL = 15000;
   protected static final int DEFAULT_WAIT_TIME_BEFORE_KILL = 15000;
 
 
+  /**
+   * Configurable delay (ms) before an app's starvation is considered after
+   * it is identified. This is to give the scheduler enough time to
+   * allocate containers post preemption. This delay is added to the
+   * {@link #WAIT_TIME_BEFORE_KILL} and enough heartbeats.
+   *
+   * This is intended to be a backdoor on production clusters, and hence
+   * intentionally not documented.
+   */
+  protected static final String WAIT_TIME_BEFORE_NEXT_STARVATION_CHECK_MS =
+      CONF_PREFIX + "waitTimeBeforeNextStarvationCheck";
+  protected static final long
+      DEFAULT_WAIT_TIME_BEFORE_NEXT_STARVATION_CHECK_MS = 10000;
+
   /** Whether to assign multiple containers in one check-in. */
   /** Whether to assign multiple containers in one check-in. */
   public static final String  ASSIGN_MULTIPLE = CONF_PREFIX + "assignmultiple";
   public static final String  ASSIGN_MULTIPLE = CONF_PREFIX + "assignmultiple";
   protected static final boolean DEFAULT_ASSIGN_MULTIPLE = false;
   protected static final boolean DEFAULT_ASSIGN_MULTIPLE = false;
@@ -251,8 +263,9 @@ public class FairSchedulerConfiguration extends Configuration {
     		"/tmp/")).getAbsolutePath() + File.separator + "fairscheduler");
     		"/tmp/")).getAbsolutePath() + File.separator + "fairscheduler");
   }
   }
   
   
-  public int getPreemptionInterval() {
-    return getInt(PREEMPTION_INTERVAL, DEFAULT_PREEMPTION_INTERVAL);
+  public long getWaitTimeBeforeNextStarvationCheck() {
+    return getLong(WAIT_TIME_BEFORE_NEXT_STARVATION_CHECK_MS,
+        DEFAULT_WAIT_TIME_BEFORE_NEXT_STARVATION_CHECK_MS);
   }
   }
   
   
   public int getWaitTimeBeforeKill() {
   public int getWaitTimeBeforeKill() {

+ 146 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/VisitedResourceRequestTracker.java

@@ -0,0 +1,146 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ClusterNodeTracker;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Applications place {@link ResourceRequest}s at multiple levels. This is a
+ * helper class that allows tracking if a {@link ResourceRequest} has been
+ * visited at a different locality level.
+ *
+ * This is implemented for {@link FSAppAttempt#getStarvedResourceRequests()}.
+ * The implementation is not thread-safe.
+ */
+class VisitedResourceRequestTracker {
+  private static final Log LOG =
+      LogFactory.getLog(VisitedResourceRequestTracker.class);
+  private final Map<Priority, Map<Resource, TrackerPerPriorityResource>> map =
+      new HashMap<>();
+  private final ClusterNodeTracker<FSSchedulerNode> nodeTracker;
+
+  VisitedResourceRequestTracker(
+      ClusterNodeTracker<FSSchedulerNode> nodeTracker) {
+    this.nodeTracker = nodeTracker;
+  }
+
+  /**
+   * Check if the {@link ResourceRequest} is visited before, and track it.
+   * @param rr {@link ResourceRequest} to visit
+   * @return true if <code>rr</code> is the first visit across all
+   * locality levels, false otherwise
+   */
+  boolean visit(ResourceRequest rr) {
+    Priority priority = rr.getPriority();
+    Resource capability = rr.getCapability();
+
+    Map<Resource, TrackerPerPriorityResource> subMap = map.get(priority);
+    if (subMap == null) {
+      subMap = new HashMap<>();
+      map.put(priority, subMap);
+    }
+
+    TrackerPerPriorityResource tracker = subMap.get(capability);
+    if (tracker == null) {
+      tracker = new TrackerPerPriorityResource();
+      subMap.put(capability, tracker);
+    }
+
+    return tracker.visit(rr.getResourceName());
+  }
+
+  private class TrackerPerPriorityResource {
+    private Set<String> racksWithNodesVisited = new HashSet<>();
+    private Set<String> racksVisted = new HashSet<>();
+    private boolean anyVisited;
+
+    private boolean visitAny() {
+      if (racksVisted.isEmpty() && racksWithNodesVisited.isEmpty()) {
+        anyVisited = true;
+      }
+      return anyVisited;
+    }
+
+    private boolean visitRack(String rackName) {
+      if (anyVisited || racksWithNodesVisited.contains(rackName)) {
+        return false;
+      } else {
+        racksVisted.add(rackName);
+        return true;
+      }
+    }
+
+    private boolean visitNode(String rackName) {
+      if (anyVisited || racksVisted.contains(rackName)) {
+        return false;
+      } else {
+        racksWithNodesVisited.add(rackName);
+        return true;
+      }
+    }
+
+    /**
+     * Based on whether <code>resourceName</code> is a node, rack or ANY,
+     * check if this has been visited earlier.
+     *
+     * A node is considered visited if its rack or ANY have been visited.
+     * A rack is considered visited if any nodes or ANY have been visited.
+     * Any is considered visited if any of the nodes/racks have been visited.
+     *
+     * @param resourceName nodename or rackname or ANY
+     * @return true if this is the first visit, false otherwise
+     */
+    private boolean visit(String resourceName) {
+      if (resourceName.equals(ResourceRequest.ANY)) {
+        return visitAny();
+      }
+
+      List<FSSchedulerNode> nodes =
+          nodeTracker.getNodesByResourceName(resourceName);
+      int numNodes = nodes.size();
+      if (numNodes == 0) {
+        LOG.error("Found ResourceRequest for a non-existent node/rack named " +
+            resourceName);
+        return false;
+      }
+
+      if (numNodes == 1) {
+        // Found a single node. To be safe, let us verify it is a node and
+        // not a rack with a single node.
+        FSSchedulerNode node = nodes.get(0);
+        if (node.getNodeName().equals(resourceName)) {
+          return visitNode(node.getRackName());
+        }
+      }
+
+      // At this point, it is not ANY or a node. Must be a rack
+      return visitRack(resourceName);
+    }
+  }
+}

+ 4 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerWithMockPreemption.java

@@ -21,6 +21,8 @@ import java.util.HashSet;
 import java.util.Set;
 import java.util.Set;
 
 
 public class FairSchedulerWithMockPreemption extends FairScheduler {
 public class FairSchedulerWithMockPreemption extends FairScheduler {
+  static final long DELAY_FOR_NEXT_STARVATION_CHECK_MS = 10 * 60 * 1000;
+
   @Override
   @Override
   protected void createPreemptionThread() {
   protected void createPreemptionThread() {
     preemptionThread = new MockPreemptionThread(this);
     preemptionThread = new MockPreemptionThread(this);
@@ -30,7 +32,7 @@ public class FairSchedulerWithMockPreemption extends FairScheduler {
     private Set<FSAppAttempt> appsAdded = new HashSet<>();
     private Set<FSAppAttempt> appsAdded = new HashSet<>();
     private int totalAppsAdded = 0;
     private int totalAppsAdded = 0;
 
 
-    MockPreemptionThread(FairScheduler scheduler) {
+    private MockPreemptionThread(FairScheduler scheduler) {
       super(scheduler);
       super(scheduler);
     }
     }
 
 
@@ -41,6 +43,7 @@ public class FairSchedulerWithMockPreemption extends FairScheduler {
           FSAppAttempt app = context.getStarvedApps().take();
           FSAppAttempt app = context.getStarvedApps().take();
           appsAdded.add(app);
           appsAdded.add(app);
           totalAppsAdded++;
           totalAppsAdded++;
+          app.preemptionTriggered(DELAY_FOR_NEXT_STARVATION_CHECK_MS);
         } catch (InterruptedException e) {
         } catch (InterruptedException e) {
           return;
           return;
         }
         }

+ 15 - 5
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSAppStarvation.java

@@ -22,6 +22,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
 import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
+import org.apache.hadoop.yarn.util.ControlledClock;
 
 
 import org.junit.After;
 import org.junit.After;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertEquals;
@@ -43,6 +44,8 @@ public class TestFSAppStarvation extends FairSchedulerTestBase {
 
 
   private static final File ALLOC_FILE = new File(TEST_DIR, "test-QUEUES");
   private static final File ALLOC_FILE = new File(TEST_DIR, "test-QUEUES");
 
 
+  private final ControlledClock clock = new ControlledClock();
+
   // Node Capacity = NODE_CAPACITY_MULTIPLE * (1 GB or 1 vcore)
   // Node Capacity = NODE_CAPACITY_MULTIPLE * (1 GB or 1 vcore)
   private static final int NODE_CAPACITY_MULTIPLE = 4;
   private static final int NODE_CAPACITY_MULTIPLE = 4;
   private static final String[] QUEUES =
   private static final String[] QUEUES =
@@ -99,11 +102,17 @@ public class TestFSAppStarvation extends FairSchedulerTestBase {
             + "minshare and fairshare queues",
             + "minshare and fairshare queues",
         3, preemptionThread.uniqueAppsAdded());
         3, preemptionThread.uniqueAppsAdded());
 
 
-    // Verify the apps get added again on a subsequent update
+    // Verify apps are added again only after the set delay for starvation has
+    // passed.
+    clock.tickSec(1);
     scheduler.update();
     scheduler.update();
-    Thread.yield();
-
+    assertEquals("Apps re-added even before starvation delay passed",
+        preemptionThread.totalAppsAdded(), preemptionThread.uniqueAppsAdded());
     verifyLeafQueueStarvation();
     verifyLeafQueueStarvation();
+
+    clock.tickMsec(
+        FairSchedulerWithMockPreemption.DELAY_FOR_NEXT_STARVATION_CHECK_MS);
+    scheduler.update();
     assertTrue("Each app is marked as starved exactly once",
     assertTrue("Each app is marked as starved exactly once",
         preemptionThread.totalAppsAdded() > preemptionThread.uniqueAppsAdded());
         preemptionThread.totalAppsAdded() > preemptionThread.uniqueAppsAdded());
   }
   }
@@ -141,7 +150,7 @@ public class TestFSAppStarvation extends FairSchedulerTestBase {
     sendEnoughNodeUpdatesToAssignFully();
     sendEnoughNodeUpdatesToAssignFully();
 
 
     // Sleep to hit the preemption timeouts
     // Sleep to hit the preemption timeouts
-    Thread.sleep(10);
+    clock.tickMsec(10);
 
 
     // Scheduler update to populate starved apps
     // Scheduler update to populate starved apps
     scheduler.update();
     scheduler.update();
@@ -208,8 +217,9 @@ public class TestFSAppStarvation extends FairSchedulerTestBase {
         ALLOC_FILE.exists());
         ALLOC_FILE.exists());
 
 
     resourceManager = new MockRM(conf);
     resourceManager = new MockRM(conf);
-    resourceManager.start();
     scheduler = (FairScheduler) resourceManager.getResourceScheduler();
     scheduler = (FairScheduler) resourceManager.getResourceScheduler();
+    scheduler.setClock(clock);
+    resourceManager.start();
     preemptionThread = (FairSchedulerWithMockPreemption.MockPreemptionThread)
     preemptionThread = (FairSchedulerWithMockPreemption.MockPreemptionThread)
         scheduler.preemptionThread;
         scheduler.preemptionThread;
 
 

+ 23 - 22
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java

@@ -23,6 +23,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
+import org.apache.hadoop.yarn.util.ControlledClock;
 import org.junit.After;
 import org.junit.After;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertFalse;
@@ -49,6 +50,9 @@ public class TestFairSchedulerPreemption extends FairSchedulerTestBase {
   private static final File ALLOC_FILE = new File(TEST_DIR, "test-queues");
   private static final File ALLOC_FILE = new File(TEST_DIR, "test-queues");
   private static final int GB = 1024;
   private static final int GB = 1024;
 
 
+  // Scheduler clock
+  private final ControlledClock clock = new ControlledClock();
+
   // Node Capacity = NODE_CAPACITY_MULTIPLE * (1 GB or 1 vcore)
   // Node Capacity = NODE_CAPACITY_MULTIPLE * (1 GB or 1 vcore)
   private static final int NODE_CAPACITY_MULTIPLE = 4;
   private static final int NODE_CAPACITY_MULTIPLE = 4;
 
 
@@ -60,25 +64,28 @@ public class TestFairSchedulerPreemption extends FairSchedulerTestBase {
   // Starving app that is expected to instigate preemption
   // Starving app that is expected to instigate preemption
   private FSAppAttempt starvingApp;
   private FSAppAttempt starvingApp;
 
 
-  @Parameterized.Parameters
-  public static Collection<Boolean[]> getParameters() {
-    return Arrays.asList(new Boolean[][] {
-        {true}, {false}});
+  @Parameterized.Parameters(name = "{0}")
+  public static Collection<Object[]> getParameters() {
+    return Arrays.asList(new Object[][] {
+        {"FairSharePreemption", true},
+        {"MinSharePreemption", false}});
   }
   }
 
 
-  public TestFairSchedulerPreemption(Boolean fairshare) throws IOException {
+  public TestFairSchedulerPreemption(String name, boolean fairshare)
+      throws IOException {
     fairsharePreemption = fairshare;
     fairsharePreemption = fairshare;
     writeAllocFile();
     writeAllocFile();
   }
   }
 
 
   @Before
   @Before
-  public void setup() {
+  public void setup() throws IOException {
     createConfiguration();
     createConfiguration();
     conf.set(FairSchedulerConfiguration.ALLOCATION_FILE,
     conf.set(FairSchedulerConfiguration.ALLOCATION_FILE,
         ALLOC_FILE.getAbsolutePath());
         ALLOC_FILE.getAbsolutePath());
     conf.setBoolean(FairSchedulerConfiguration.PREEMPTION, true);
     conf.setBoolean(FairSchedulerConfiguration.PREEMPTION, true);
     conf.setFloat(FairSchedulerConfiguration.PREEMPTION_THRESHOLD, 0f);
     conf.setFloat(FairSchedulerConfiguration.PREEMPTION_THRESHOLD, 0f);
     conf.setInt(FairSchedulerConfiguration.WAIT_TIME_BEFORE_KILL, 0);
     conf.setInt(FairSchedulerConfiguration.WAIT_TIME_BEFORE_KILL, 0);
+    setupCluster();
   }
   }
 
 
   @After
   @After
@@ -166,8 +173,9 @@ public class TestFairSchedulerPreemption extends FairSchedulerTestBase {
 
 
   private void setupCluster() throws IOException {
   private void setupCluster() throws IOException {
     resourceManager = new MockRM(conf);
     resourceManager = new MockRM(conf);
-    resourceManager.start();
     scheduler = (FairScheduler) resourceManager.getResourceScheduler();
     scheduler = (FairScheduler) resourceManager.getResourceScheduler();
+    scheduler.setClock(clock);
+    resourceManager.start();
 
 
     // Create and add two nodes to the cluster
     // Create and add two nodes to the cluster
     addNode(NODE_CAPACITY_MULTIPLE * GB, NODE_CAPACITY_MULTIPLE);
     addNode(NODE_CAPACITY_MULTIPLE * GB, NODE_CAPACITY_MULTIPLE);
@@ -197,7 +205,7 @@ public class TestFairSchedulerPreemption extends FairSchedulerTestBase {
    *
    *
    * @param queueName queue name
    * @param queueName queue name
    */
    */
-  private void takeAllResource(String queueName) {
+  private void takeAllResources(String queueName) {
     // Create an app that takes up all the resources on the cluster
     // Create an app that takes up all the resources on the cluster
     ApplicationAttemptId appAttemptId
     ApplicationAttemptId appAttemptId
         = createSchedulingRequest(GB, 1, queueName, "default",
         = createSchedulingRequest(GB, 1, queueName, "default",
@@ -227,8 +235,8 @@ public class TestFairSchedulerPreemption extends FairSchedulerTestBase {
         NODE_CAPACITY_MULTIPLE * rmNodes.size() / 2);
         NODE_CAPACITY_MULTIPLE * rmNodes.size() / 2);
     starvingApp = scheduler.getSchedulerApp(appAttemptId);
     starvingApp = scheduler.getSchedulerApp(appAttemptId);
 
 
-    // Sleep long enough to pass
-    Thread.sleep(10);
+    // Move clock enough to identify starvation
+    clock.tickSec(1);
     scheduler.update();
     scheduler.update();
   }
   }
 
 
@@ -243,14 +251,13 @@ public class TestFairSchedulerPreemption extends FairSchedulerTestBase {
    */
    */
   private void submitApps(String queue1, String queue2)
   private void submitApps(String queue1, String queue2)
       throws InterruptedException {
       throws InterruptedException {
-    takeAllResource(queue1);
+    takeAllResources(queue1);
     preemptHalfResources(queue2);
     preemptHalfResources(queue2);
   }
   }
 
 
   private void verifyPreemption() throws InterruptedException {
   private void verifyPreemption() throws InterruptedException {
-    // Sleep long enough for four containers to be preempted. Note that the
-    // starved app must be queued four times for containers to be preempted.
-    for (int i = 0; i < 10000; i++) {
+    // Sleep long enough for four containers to be preempted.
+    for (int i = 0; i < 100; i++) {
       if (greedyApp.getLiveContainers().size() == 4) {
       if (greedyApp.getLiveContainers().size() == 4) {
         break;
         break;
       }
       }
@@ -268,7 +275,7 @@ public class TestFairSchedulerPreemption extends FairSchedulerTestBase {
 
 
   private void verifyNoPreemption() throws InterruptedException {
   private void verifyNoPreemption() throws InterruptedException {
     // Sleep long enough to ensure not even one container is preempted.
     // Sleep long enough to ensure not even one container is preempted.
-    for (int i = 0; i < 600; i++) {
+    for (int i = 0; i < 100; i++) {
       if (greedyApp.getLiveContainers().size() != 8) {
       if (greedyApp.getLiveContainers().size() != 8) {
         break;
         break;
       }
       }
@@ -279,7 +286,6 @@ public class TestFairSchedulerPreemption extends FairSchedulerTestBase {
 
 
   @Test
   @Test
   public void testPreemptionWithinSameLeafQueue() throws Exception {
   public void testPreemptionWithinSameLeafQueue() throws Exception {
-    setupCluster();
     String queue = "root.preemptable.child-1";
     String queue = "root.preemptable.child-1";
     submitApps(queue, queue);
     submitApps(queue, queue);
     if (fairsharePreemption) {
     if (fairsharePreemption) {
@@ -291,21 +297,18 @@ public class TestFairSchedulerPreemption extends FairSchedulerTestBase {
 
 
   @Test
   @Test
   public void testPreemptionBetweenTwoSiblingLeafQueues() throws Exception {
   public void testPreemptionBetweenTwoSiblingLeafQueues() throws Exception {
-    setupCluster();
     submitApps("root.preemptable.child-1", "root.preemptable.child-2");
     submitApps("root.preemptable.child-1", "root.preemptable.child-2");
     verifyPreemption();
     verifyPreemption();
   }
   }
 
 
   @Test
   @Test
   public void testPreemptionBetweenNonSiblingQueues() throws Exception {
   public void testPreemptionBetweenNonSiblingQueues() throws Exception {
-    setupCluster();
     submitApps("root.preemptable.child-1", "root.nonpreemptable.child-1");
     submitApps("root.preemptable.child-1", "root.nonpreemptable.child-1");
     verifyPreemption();
     verifyPreemption();
   }
   }
 
 
   @Test
   @Test
   public void testNoPreemptionFromDisallowedQueue() throws Exception {
   public void testNoPreemptionFromDisallowedQueue() throws Exception {
-    setupCluster();
     submitApps("root.nonpreemptable.child-1", "root.preemptable.child-1");
     submitApps("root.nonpreemptable.child-1", "root.preemptable.child-1");
     verifyNoPreemption();
     verifyNoPreemption();
   }
   }
@@ -331,9 +334,7 @@ public class TestFairSchedulerPreemption extends FairSchedulerTestBase {
 
 
   @Test
   @Test
   public void testPreemptionSelectNonAMContainer() throws Exception {
   public void testPreemptionSelectNonAMContainer() throws Exception {
-    setupCluster();
-
-    takeAllResource("root.preemptable.child-1");
+    takeAllResources("root.preemptable.child-1");
     setNumAMContainersPerNode(2);
     setNumAMContainersPerNode(2);
     preemptHalfResources("root.preemptable.child-2");
     preemptHalfResources("root.preemptable.child-2");
 
 

+ 112 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestVisitedResourceRequestTracker.java

@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at*
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
+
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ClusterNodeTracker;
+import org.apache.hadoop.yarn.util.resource.Resources;
+import org.junit.Assert;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import org.junit.Test;
+
+import java.util.List;
+
+public class TestVisitedResourceRequestTracker {
+  private final ClusterNodeTracker<FSSchedulerNode>
+      nodeTracker = new ClusterNodeTracker<>();
+  private final ResourceRequest
+      anyRequest, rackRequest, node1Request, node2Request;
+
+  private final String NODE_VISITED = "The node is already visited. ";
+  private final String RACK_VISITED = "The rack is already visited. ";
+  private final String ANY_VISITED = "ANY is already visited. ";
+  private final String NODE_FAILURE = "The node is visited again.";
+  private final String RACK_FAILURE = "The rack is visited again.";
+  private final String ANY_FAILURE = "ANY is visited again.";
+  private final String FIRST_CALL_FAILURE = "First call to visit failed.";
+
+  public TestVisitedResourceRequestTracker() {
+    List<RMNode> rmNodes =
+        MockNodes.newNodes(1, 2, Resources.createResource(8192, 8));
+
+    FSSchedulerNode node1 = new FSSchedulerNode(rmNodes.get(0), false);
+    nodeTracker.addNode(node1);
+    node1Request = createRR(node1.getNodeName(), 1);
+
+    FSSchedulerNode node2 = new FSSchedulerNode(rmNodes.get(1), false);
+    node2Request = createRR(node2.getNodeName(), 1);
+    nodeTracker.addNode(node2);
+
+    anyRequest = createRR(ResourceRequest.ANY, 2);
+    rackRequest = createRR(node1.getRackName(), 2);
+  }
+
+  private ResourceRequest createRR(String resourceName, int count) {
+    return ResourceRequest.newInstance(
+        Priority.UNDEFINED, resourceName, Resources.none(), count);
+  }
+
+  @Test
+  public void testVisitAnyRequestFirst() {
+    VisitedResourceRequestTracker tracker =
+        new VisitedResourceRequestTracker(nodeTracker);
+
+    // Visit ANY request first
+    assertTrue(FIRST_CALL_FAILURE, tracker.visit(anyRequest));
+
+    // All other requests should return false
+    assertFalse(ANY_VISITED + RACK_FAILURE, tracker.visit(rackRequest));
+    assertFalse(ANY_VISITED + NODE_FAILURE, tracker.visit(node1Request));
+    assertFalse(ANY_VISITED + NODE_FAILURE, tracker.visit(node2Request));
+  }
+
+  @Test
+  public void testVisitRackRequestFirst() {
+    VisitedResourceRequestTracker tracker =
+        new VisitedResourceRequestTracker(nodeTracker);
+
+    // Visit rack request first
+    assertTrue(FIRST_CALL_FAILURE, tracker.visit(rackRequest));
+
+    // All other requests should return false
+    assertFalse(RACK_VISITED + ANY_FAILURE, tracker.visit(anyRequest));
+    assertFalse(RACK_VISITED + NODE_FAILURE, tracker.visit(node1Request));
+    assertFalse(RACK_VISITED + NODE_FAILURE, tracker.visit(node2Request));
+  }
+
+  @Test
+  public void testVisitNodeRequestFirst() {
+    VisitedResourceRequestTracker tracker =
+        new VisitedResourceRequestTracker(nodeTracker);
+
+    // Visit node1 first
+    assertTrue(FIRST_CALL_FAILURE, tracker.visit(node1Request));
+
+    // Rack and ANY should return false
+    assertFalse(NODE_VISITED + ANY_FAILURE, tracker.visit(anyRequest));
+    assertFalse(NODE_VISITED + RACK_FAILURE, tracker.visit(rackRequest));
+
+    // The other node should return true
+    assertTrue(NODE_VISITED + "Different node visit failed",
+        tracker.visit(node2Request));
+  }
+}