Explorar el Código

Revert "YARN-2113. Add cross-user preemption within CapacityScheduler's leaf-queue. (Contributed by Sunil G)"

This reverts commit eda4ac07c1835031aca7e27cc673f1c5913813bb.
Commit eda4ac07c1835031aca7e27cc673f1c5913813bb was a separate patch from trunk rather than a cherry-pick. I will cherryp-ick dependencies and then cherry-pick the trunk commit for YARN-2113.
Eric Payne hace 7 años
padre
commit
e6cdf770ca
Se han modificado 19 ficheros con 173 adiciones y 1738 borrados
  1. 0 5
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DefaultResourceCalculator.java
  2. 0 5
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DominantResourceCalculator.java
  3. 0 9
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceCalculator.java
  4. 0 5
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/Resources.java
  5. 0 5
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/CapacitySchedulerPreemptionContext.java
  6. 4 5
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/CapacitySchedulerPreemptionUtils.java
  7. 96 233
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoIntraQueuePreemptionPlugin.java
  8. 34 78
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/IntraQueueCandidatesSelector.java
  9. 1 9
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/IntraQueuePreemptionComputePlugin.java
  10. 1 24
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java
  11. 8 16
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempAppPerPartition.java
  12. 0 14
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempQueuePerPartition.java
  13. 0 88
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempUserPerPartition.java
  14. 0 8
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
  15. 13 54
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
  16. 12 77
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicyMockFramework.java
  17. 4 26
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyIntraQueue.java
  18. 0 899
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyIntraQueueUserLimit.java
  19. 0 178
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyIntraQueueWithDRF.java

+ 0 - 5
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DefaultResourceCalculator.java

@@ -121,9 +121,4 @@ public class DefaultResourceCalculator extends ResourceCalculator {
       Resource smaller, Resource bigger) {
       Resource smaller, Resource bigger) {
     return smaller.getMemorySize() <= bigger.getMemorySize();
     return smaller.getMemorySize() <= bigger.getMemorySize();
   }
   }
-
-  @Override
-  public boolean isAnyMajorResourceZero(Resource resource) {
-    return resource.getMemorySize() == 0f;
-  }
 }
 }

+ 0 - 5
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DominantResourceCalculator.java

@@ -231,9 +231,4 @@ public class DominantResourceCalculator extends ResourceCalculator {
     return smaller.getMemorySize() <= bigger.getMemorySize()
     return smaller.getMemorySize() <= bigger.getMemorySize()
         && smaller.getVirtualCores() <= bigger.getVirtualCores();
         && smaller.getVirtualCores() <= bigger.getVirtualCores();
   }
   }
-
-  @Override
-  public boolean isAnyMajorResourceZero(Resource resource) {
-    return resource.getMemorySize() == 0f || resource.getVirtualCores() == 0;
-  }
 }
 }

+ 0 - 9
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceCalculator.java

@@ -204,13 +204,4 @@ public abstract class ResourceCalculator {
    */
    */
   public abstract boolean fitsIn(Resource cluster,
   public abstract boolean fitsIn(Resource cluster,
       Resource smaller, Resource bigger);
       Resource smaller, Resource bigger);
-
-  /**
-   * Check if resource has any major resource types (which are all NodeManagers
-   * included) a zero value.
-   *
-   * @param resource resource
-   * @return returns true if any resource is zero.
-   */
-  public abstract boolean isAnyMajorResourceZero(Resource resource);
 }
 }

+ 0 - 5
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/Resources.java

@@ -345,9 +345,4 @@ public class Resources {
     return createResource(Math.max(lhs.getMemorySize(), rhs.getMemorySize()),
     return createResource(Math.max(lhs.getMemorySize(), rhs.getMemorySize()),
         Math.max(lhs.getVirtualCores(), rhs.getVirtualCores()));
         Math.max(lhs.getVirtualCores(), rhs.getVirtualCores()));
   }
   }
-
-  public static boolean isAnyMajorResourceZero(ResourceCalculator rc,
-      Resource resource) {
-    return rc.isAnyMajorResourceZero(resource);
-  }
 }
 }

+ 0 - 5
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/CapacitySchedulerPreemptionContext.java

@@ -18,11 +18,9 @@
 
 
 package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity;
 package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity;
 
 
-import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
-import org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.IntraQueuePreemptionOrderPolicy;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
 import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
 import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
 
 
@@ -65,7 +63,4 @@ interface CapacitySchedulerPreemptionContext {
   float getMinimumThresholdForIntraQueuePreemption();
   float getMinimumThresholdForIntraQueuePreemption();
 
 
   float getMaxAllowableLimitForIntraQueuePreemption();
   float getMaxAllowableLimitForIntraQueuePreemption();
-
-  @Unstable
-  IntraQueuePreemptionOrderPolicy getIntraQueuePreemptionOrderPolicy();
 }
 }

+ 4 - 5
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/CapacitySchedulerPreemptionUtils.java

@@ -99,7 +99,7 @@ public class CapacitySchedulerPreemptionUtils {
           }
           }
 
 
           deductPreemptableResourcePerApp(context, tq.totalPartitionResource,
           deductPreemptableResourcePerApp(context, tq.totalPartitionResource,
-              tas, res);
+              tas, res, partition);
         }
         }
       }
       }
     }
     }
@@ -108,10 +108,10 @@ public class CapacitySchedulerPreemptionUtils {
   private static void deductPreemptableResourcePerApp(
   private static void deductPreemptableResourcePerApp(
       CapacitySchedulerPreemptionContext context,
       CapacitySchedulerPreemptionContext context,
       Resource totalPartitionResource, Collection<TempAppPerPartition> tas,
       Resource totalPartitionResource, Collection<TempAppPerPartition> tas,
-      Resource res) {
+      Resource res, String partition) {
     for (TempAppPerPartition ta : tas) {
     for (TempAppPerPartition ta : tas) {
       ta.deductActuallyToBePreempted(context.getResourceCalculator(),
       ta.deductActuallyToBePreempted(context.getResourceCalculator(),
-          totalPartitionResource, res);
+          totalPartitionResource, res, partition);
     }
     }
   }
   }
 
 
@@ -157,8 +157,7 @@ public class CapacitySchedulerPreemptionUtils {
         && Resources.greaterThan(rc, clusterResource, toObtainByPartition,
         && Resources.greaterThan(rc, clusterResource, toObtainByPartition,
             Resources.none())
             Resources.none())
         && Resources.fitsIn(rc, clusterResource,
         && Resources.fitsIn(rc, clusterResource,
-            rmContainer.getAllocatedResource(), totalPreemptionAllowed)
-        && !Resources.isAnyMajorResourceZero(rc, toObtainByPartition)) {
+            rmContainer.getAllocatedResource(), totalPreemptionAllowed)) {
       Resources.subtractFrom(toObtainByPartition,
       Resources.subtractFrom(toObtainByPartition,
           rmContainer.getAllocatedResource());
           rmContainer.getAllocatedResource());
       Resources.subtractFrom(totalPreemptionAllowed,
       Resources.subtractFrom(totalPreemptionAllowed,

+ 96 - 233
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoIntraQueuePreemptionPlugin.java

@@ -18,13 +18,11 @@
 
 
 package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity;
 package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity;
 
 
-import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.HashSet;
-import java.util.List;
 import java.util.Map;
 import java.util.Map;
 import java.util.PriorityQueue;
 import java.util.PriorityQueue;
 import java.util.Set;
 import java.util.Set;
@@ -35,11 +33,8 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.IntraQueueCandidatesSelector.TAPriorityComparator;
 import org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.IntraQueueCandidatesSelector.TAPriorityComparator;
-import org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.IntraQueuePreemptionOrderPolicy;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
 import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
 import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
 import org.apache.hadoop.yarn.util.resource.Resources;
 import org.apache.hadoop.yarn.util.resource.Resources;
@@ -64,26 +59,6 @@ public class FifoIntraQueuePreemptionPlugin
     this.rc = rc;
     this.rc = rc;
   }
   }
 
 
-  @Override
-  public Collection<FiCaSchedulerApp> getPreemptableApps(String queueName,
-      String partition) {
-    TempQueuePerPartition tq = context.getQueueByPartition(queueName,
-        partition);
-
-    List<FiCaSchedulerApp> apps = new ArrayList<FiCaSchedulerApp>();
-    for (TempAppPerPartition tmpApp : tq.getApps()) {
-      // If a lower priority app was not selected to get preempted, mark such
-      // apps out from preemption candidate selection.
-      if (Resources.equals(tmpApp.getActuallyToBePreempted(),
-          Resources.none())) {
-        continue;
-      }
-
-      apps.add(tmpApp.app);
-    }
-    return apps;
-  }
-
   @Override
   @Override
   public Map<String, Resource> getResourceDemandFromAppsPerQueue(
   public Map<String, Resource> getResourceDemandFromAppsPerQueue(
       String queueName, String partition) {
       String queueName, String partition) {
@@ -114,7 +89,7 @@ public class FifoIntraQueuePreemptionPlugin
 
 
   @Override
   @Override
   public void computeAppsIdealAllocation(Resource clusterResource,
   public void computeAppsIdealAllocation(Resource clusterResource,
-      TempQueuePerPartition tq,
+      Resource partitionBasedResource, TempQueuePerPartition tq,
       Map<ApplicationAttemptId, Set<RMContainer>> selectedCandidates,
       Map<ApplicationAttemptId, Set<RMContainer>> selectedCandidates,
       Resource totalPreemptedResourceAllowed,
       Resource totalPreemptedResourceAllowed,
       Resource queueReassignableResource, float maxAllowablePreemptLimit) {
       Resource queueReassignableResource, float maxAllowablePreemptLimit) {
@@ -137,15 +112,17 @@ public class FifoIntraQueuePreemptionPlugin
 
 
     // 3. Create all tempApps for internal calculation and return a list from
     // 3. Create all tempApps for internal calculation and return a list from
     // high priority to low priority order.
     // high priority to low priority order.
-    PriorityQueue<TempAppPerPartition> orderedByPriority = createTempAppForResCalculation(
-        tq, apps, clusterResource, perUserAMUsed);
+    TAPriorityComparator taComparator = new TAPriorityComparator();
+    PriorityQueue<TempAppPerPartition> orderedByPriority =
+        createTempAppForResCalculation(tq.partition, apps, taComparator);
 
 
     // 4. Calculate idealAssigned per app by checking based on queue's
     // 4. Calculate idealAssigned per app by checking based on queue's
     // unallocated resource.Also return apps arranged from lower priority to
     // unallocated resource.Also return apps arranged from lower priority to
     // higher priority.
     // higher priority.
-    TreeSet<TempAppPerPartition> orderedApps = calculateIdealAssignedResourcePerApp(
-        clusterResource, tq, selectedCandidates, queueReassignableResource,
-        orderedByPriority);
+    TreeSet<TempAppPerPartition> orderedApps =
+        calculateIdealAssignedResourcePerApp(clusterResource,
+            partitionBasedResource, tq, selectedCandidates,
+            queueReassignableResource, orderedByPriority, perUserAMUsed);
 
 
     // 5. A configurable limit that could define an ideal allowable preemption
     // 5. A configurable limit that could define an ideal allowable preemption
     // limit. Based on current queue's capacity,defined how much % could become
     // limit. Based on current queue's capacity,defined how much % could become
@@ -168,7 +145,7 @@ public class FifoIntraQueuePreemptionPlugin
     // 7. From lowest priority app onwards, calculate toBePreempted resource
     // 7. From lowest priority app onwards, calculate toBePreempted resource
     // based on demand.
     // based on demand.
     calculateToBePreemptedResourcePerApp(clusterResource, orderedApps,
     calculateToBePreemptedResourcePerApp(clusterResource, orderedApps,
-        Resources.clone(preemptionLimit));
+        preemptionLimit);
 
 
     // Save all apps (low to high) to temp queue for further reference
     // Save all apps (low to high) to temp queue for further reference
     tq.addAllApps(orderedApps);
     tq.addAllApps(orderedApps);
@@ -176,8 +153,7 @@ public class FifoIntraQueuePreemptionPlugin
     // 8. There are chances that we may preempt for the demand from same
     // 8. There are chances that we may preempt for the demand from same
     // priority level, such cases are to be validated out.
     // priority level, such cases are to be validated out.
     validateOutSameAppPriorityFromDemand(clusterResource,
     validateOutSameAppPriorityFromDemand(clusterResource,
-        (TreeSet<TempAppPerPartition>) orderedApps, tq.getUsersPerPartition(),
-        context.getIntraQueuePreemptionOrderPolicy());
+        (TreeSet<TempAppPerPartition>) tq.getApps());
 
 
     if (LOG.isDebugEnabled()) {
     if (LOG.isDebugEnabled()) {
       LOG.debug("Queue Name:" + tq.queueName + ", partition:" + tq.partition);
       LOG.debug("Queue Name:" + tq.queueName + ", partition:" + tq.partition);
@@ -200,17 +176,17 @@ public class FifoIntraQueuePreemptionPlugin
 
 
       Resource preemtableFromApp = Resources.subtract(tmpApp.getUsed(),
       Resource preemtableFromApp = Resources.subtract(tmpApp.getUsed(),
           tmpApp.idealAssigned);
           tmpApp.idealAssigned);
-      Resources.subtractFromNonNegative(preemtableFromApp, tmpApp.selected);
-      Resources.subtractFromNonNegative(preemtableFromApp, tmpApp.getAMUsed());
+      Resources.subtractFrom(preemtableFromApp, tmpApp.selected);
+      Resources.subtractFrom(preemtableFromApp, tmpApp.getAMUsed());
 
 
       // Calculate toBePreempted from apps as follows:
       // Calculate toBePreempted from apps as follows:
       // app.preemptable = min(max(app.used - app.selected - app.ideal, 0),
       // app.preemptable = min(max(app.used - app.selected - app.ideal, 0),
       // intra_q_preemptable)
       // intra_q_preemptable)
       tmpApp.toBePreempted = Resources.min(rc, clusterResource, Resources
       tmpApp.toBePreempted = Resources.min(rc, clusterResource, Resources
           .max(rc, clusterResource, preemtableFromApp, Resources.none()),
           .max(rc, clusterResource, preemtableFromApp, Resources.none()),
-          Resources.clone(preemptionLimit));
+          preemptionLimit);
 
 
-      preemptionLimit = Resources.subtractFromNonNegative(preemptionLimit,
+      preemptionLimit = Resources.subtract(preemptionLimit,
           tmpApp.toBePreempted);
           tmpApp.toBePreempted);
     }
     }
   }
   }
@@ -245,24 +221,31 @@ public class FifoIntraQueuePreemptionPlugin
    * }
    * }
    *  
    *  
    * @param clusterResource Cluster Resource
    * @param clusterResource Cluster Resource
+   * @param partitionBasedResource resource per partition
    * @param tq TempQueue
    * @param tq TempQueue
    * @param selectedCandidates Already Selected preemption candidates
    * @param selectedCandidates Already Selected preemption candidates
    * @param queueReassignableResource Resource used in a queue
    * @param queueReassignableResource Resource used in a queue
    * @param orderedByPriority List of running apps
    * @param orderedByPriority List of running apps
+   * @param perUserAMUsed AM used resource
    * @return List of temp apps ordered from low to high priority
    * @return List of temp apps ordered from low to high priority
    */
    */
   private TreeSet<TempAppPerPartition> calculateIdealAssignedResourcePerApp(
   private TreeSet<TempAppPerPartition> calculateIdealAssignedResourcePerApp(
-      Resource clusterResource, TempQueuePerPartition tq,
+      Resource clusterResource, Resource partitionBasedResource,
+      TempQueuePerPartition tq,
       Map<ApplicationAttemptId, Set<RMContainer>> selectedCandidates,
       Map<ApplicationAttemptId, Set<RMContainer>> selectedCandidates,
       Resource queueReassignableResource,
       Resource queueReassignableResource,
-      PriorityQueue<TempAppPerPartition> orderedByPriority) {
+      PriorityQueue<TempAppPerPartition> orderedByPriority,
+      Map<String, Resource> perUserAMUsed) {
 
 
     Comparator<TempAppPerPartition> reverseComp = Collections
     Comparator<TempAppPerPartition> reverseComp = Collections
         .reverseOrder(new TAPriorityComparator());
         .reverseOrder(new TAPriorityComparator());
     TreeSet<TempAppPerPartition> orderedApps = new TreeSet<>(reverseComp);
     TreeSet<TempAppPerPartition> orderedApps = new TreeSet<>(reverseComp);
 
 
+    Map<String, Resource> userIdealAssignedMapping = new HashMap<>();
     String partition = tq.partition;
     String partition = tq.partition;
-    Map<String, TempUserPerPartition> usersPerPartition = tq.getUsersPerPartition();
+
+    Map<String, Resource> preCalculatedUserLimit =
+        new HashMap<String, Resource>();
 
 
     while (!orderedByPriority.isEmpty()) {
     while (!orderedByPriority.isEmpty()) {
       // Remove app from the next highest remaining priority and process it to
       // Remove app from the next highest remaining priority and process it to
@@ -272,19 +255,43 @@ public class FifoIntraQueuePreemptionPlugin
 
 
       // Once unallocated resource is 0, we can stop assigning ideal per app.
       // Once unallocated resource is 0, we can stop assigning ideal per app.
       if (Resources.lessThanOrEqual(rc, clusterResource,
       if (Resources.lessThanOrEqual(rc, clusterResource,
-          queueReassignableResource, Resources.none())
-          || Resources.isAnyMajorResourceZero(rc, queueReassignableResource)) {
+          queueReassignableResource, Resources.none())) {
         continue;
         continue;
       }
       }
 
 
       String userName = tmpApp.app.getUser();
       String userName = tmpApp.app.getUser();
-      TempUserPerPartition tmpUser = usersPerPartition.get(userName);
-      Resource userLimitResource = tmpUser.getUserLimit();
-      Resource idealAssignedForUser = tmpUser.idealAssigned;
+      Resource userLimitResource = preCalculatedUserLimit.get(userName);
+
+      // Verify whether we already calculated headroom for this user.
+      if (userLimitResource == null) {
+        userLimitResource = Resources.clone(tq.leafQueue
+            .getUserLimitPerUser(userName, partitionBasedResource, partition));
+
+        Resource amUsed = perUserAMUsed.get(userName);
+        if (null == amUsed) {
+          amUsed = Resources.createResource(0, 0);
+        }
+
+        // Real AM used need not have to be considered for user-limit as well.
+        userLimitResource = Resources.subtract(userLimitResource, amUsed);
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Userlimit for user '" + userName + "' is :"
+              + userLimitResource + ", and amUsed is:" + amUsed);
+        }
+
+        preCalculatedUserLimit.put(userName, userLimitResource);
+      }
+
+      Resource idealAssignedForUser = userIdealAssignedMapping.get(userName);
+
+      if (idealAssignedForUser == null) {
+        idealAssignedForUser = Resources.createResource(0, 0);
+        userIdealAssignedMapping.put(userName, idealAssignedForUser);
+      }
 
 
       // Calculate total selected container resources from current app.
       // Calculate total selected container resources from current app.
-      getAlreadySelectedPreemptionCandidatesResource(selectedCandidates, tmpApp,
-          tmpUser, partition);
+      getAlreadySelectedPreemptionCandidatesResource(selectedCandidates,
+          tmpApp, partition);
 
 
       // For any app, used+pending will give its idealAssigned. However it will
       // For any app, used+pending will give its idealAssigned. However it will
       // be tightly linked to queue's unallocated quota. So lower priority apps
       // be tightly linked to queue's unallocated quota. So lower priority apps
@@ -295,11 +302,10 @@ public class FifoIntraQueuePreemptionPlugin
 
 
       if (Resources.lessThan(rc, clusterResource, idealAssignedForUser,
       if (Resources.lessThan(rc, clusterResource, idealAssignedForUser,
           userLimitResource)) {
           userLimitResource)) {
-        Resource idealAssigned = Resources.min(rc, clusterResource,
-            appIdealAssigned,
+        appIdealAssigned = Resources.min(rc, clusterResource, appIdealAssigned,
             Resources.subtract(userLimitResource, idealAssignedForUser));
             Resources.subtract(userLimitResource, idealAssignedForUser));
         tmpApp.idealAssigned = Resources.clone(Resources.min(rc,
         tmpApp.idealAssigned = Resources.clone(Resources.min(rc,
-            clusterResource, queueReassignableResource, idealAssigned));
+            clusterResource, queueReassignableResource, appIdealAssigned));
         Resources.addTo(idealAssignedForUser, tmpApp.idealAssigned);
         Resources.addTo(idealAssignedForUser, tmpApp.idealAssigned);
       } else {
       } else {
         continue;
         continue;
@@ -314,8 +320,7 @@ public class FifoIntraQueuePreemptionPlugin
             Resources.subtract(tmpApp.idealAssigned, appUsedExcludedSelected));
             Resources.subtract(tmpApp.idealAssigned, appUsedExcludedSelected));
       }
       }
 
 
-      Resources.subtractFromNonNegative(queueReassignableResource,
-          tmpApp.idealAssigned);
+      Resources.subtractFrom(queueReassignableResource, tmpApp.idealAssigned);
     }
     }
 
 
     return orderedApps;
     return orderedApps;
@@ -327,8 +332,7 @@ public class FifoIntraQueuePreemptionPlugin
    */
    */
   private void getAlreadySelectedPreemptionCandidatesResource(
   private void getAlreadySelectedPreemptionCandidatesResource(
       Map<ApplicationAttemptId, Set<RMContainer>> selectedCandidates,
       Map<ApplicationAttemptId, Set<RMContainer>> selectedCandidates,
-      TempAppPerPartition tmpApp, TempUserPerPartition tmpUser,
-      String partition) {
+      TempAppPerPartition tmpApp, String partition) {
     tmpApp.selected = Resources.createResource(0, 0);
     tmpApp.selected = Resources.createResource(0, 0);
     Set<RMContainer> containers = selectedCandidates
     Set<RMContainer> containers = selectedCandidates
         .get(tmpApp.app.getApplicationAttemptId());
         .get(tmpApp.app.getApplicationAttemptId());
@@ -340,23 +344,16 @@ public class FifoIntraQueuePreemptionPlugin
     for (RMContainer cont : containers) {
     for (RMContainer cont : containers) {
       if (partition.equals(cont.getNodeLabelExpression())) {
       if (partition.equals(cont.getNodeLabelExpression())) {
         Resources.addTo(tmpApp.selected, cont.getAllocatedResource());
         Resources.addTo(tmpApp.selected, cont.getAllocatedResource());
-        Resources.addTo(tmpUser.selected, cont.getAllocatedResource());
       }
       }
     }
     }
   }
   }
 
 
   private PriorityQueue<TempAppPerPartition> createTempAppForResCalculation(
   private PriorityQueue<TempAppPerPartition> createTempAppForResCalculation(
-      TempQueuePerPartition tq, Collection<FiCaSchedulerApp> apps,
-      Resource clusterResource,
-      Map<String, Resource> perUserAMUsed) {
-    TAPriorityComparator taComparator = new TAPriorityComparator();
+      String partition, Collection<FiCaSchedulerApp> apps,
+      TAPriorityComparator taComparator) {
     PriorityQueue<TempAppPerPartition> orderedByPriority = new PriorityQueue<>(
     PriorityQueue<TempAppPerPartition> orderedByPriority = new PriorityQueue<>(
         100, taComparator);
         100, taComparator);
 
 
-    String partition = tq.partition;
-    Map<String, TempUserPerPartition> usersPerPartition = tq
-        .getUsersPerPartition();
-
     // have an internal temp app structure to store intermediate data(priority)
     // have an internal temp app structure to store intermediate data(priority)
     for (FiCaSchedulerApp app : apps) {
     for (FiCaSchedulerApp app : apps) {
 
 
@@ -388,156 +385,56 @@ public class FifoIntraQueuePreemptionPlugin
       tmpApp.idealAssigned = Resources.createResource(0, 0);
       tmpApp.idealAssigned = Resources.createResource(0, 0);
 
 
       orderedByPriority.add(tmpApp);
       orderedByPriority.add(tmpApp);
-
-      // Create a TempUserPerPartition structure to hold more information
-      // regarding each user's entities such as UserLimit etc. This could
-      // be kept in a user to TempUserPerPartition map for further reference.
-      String userName = app.getUser();
-      if (!usersPerPartition.containsKey(userName)) {
-        ResourceUsage userResourceUsage = tq.leafQueue.getUser(userName)
-            .getResourceUsage();
-
-        TempUserPerPartition tmpUser = new TempUserPerPartition(
-            tq.leafQueue.getUser(userName), tq.queueName,
-            Resources.clone(userResourceUsage.getUsed(partition)),
-            Resources.clone(perUserAMUsed.get(userName)),
-            Resources.clone(userResourceUsage.getReserved(partition)),
-            Resources.none());
-
-        Resource userLimitResource = Resources.clone(
-            tq.leafQueue.getResourceLimitForAllUsers(userName, clusterResource,
-                partition, SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY));
-
-        // Real AM used need not have to be considered for user-limit as well.
-        userLimitResource = Resources.subtract(userLimitResource,
-            tmpUser.amUsed);
-        tmpUser.setUserLimit(userLimitResource);
-
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("TempUser:" + tmpUser);
-        }
-
-        tmpUser.idealAssigned = Resources.createResource(0, 0);
-        tq.addUserPerPartition(userName, tmpUser);
-      }
     }
     }
     return orderedByPriority;
     return orderedByPriority;
   }
   }
 
 
   /*
   /*
    * Fifo+Priority based preemption policy need not have to preempt resources at
    * Fifo+Priority based preemption policy need not have to preempt resources at
-   * same priority level. Such cases will be validated out. But if the demand is
-   * from an app of different user, force to preempt resources even if apps are
-   * at same priority.
+   * same priority level. Such cases will be validated out.
    */
    */
   public void validateOutSameAppPriorityFromDemand(Resource cluster,
   public void validateOutSameAppPriorityFromDemand(Resource cluster,
-      TreeSet<TempAppPerPartition> orderedApps,
-      Map<String, TempUserPerPartition> usersPerPartition,
-      IntraQueuePreemptionOrderPolicy intraQueuePreemptionOrder) {
+      TreeSet<TempAppPerPartition> appsOrderedfromLowerPriority) {
 
 
-    TempAppPerPartition[] apps = orderedApps
-        .toArray(new TempAppPerPartition[orderedApps.size()]);
+    TempAppPerPartition[] apps = appsOrderedfromLowerPriority
+        .toArray(new TempAppPerPartition[appsOrderedfromLowerPriority.size()]);
     if (apps.length <= 0) {
     if (apps.length <= 0) {
       return;
       return;
     }
     }
 
 
-    for (int hPriority = apps.length - 1; hPriority >= 0; hPriority--) {
-
-      // Check whether high priority app with demand needs resource from other
-      // user.
-      if (Resources.greaterThan(rc, cluster,
-          apps[hPriority].getToBePreemptFromOther(), Resources.none())) {
-
-        // Given we have a demand from a high priority app, we can do a reverse
-        // scan from lower priority apps to select resources.
-        // Since idealAssigned of each app has considered user-limit, this logic
-        // will provide eventual consistency w.r.t user-limit as well.
-        for (int lPriority = 0; lPriority < apps.length; lPriority++) {
+    int lPriority = 0;
+    int hPriority = apps.length - 1;
+
+    while (lPriority < hPriority
+        && !apps[lPriority].equals(apps[hPriority])
+        && apps[lPriority].getPriority() < apps[hPriority].getPriority()) {
+      Resource toPreemptFromOther = apps[hPriority]
+          .getToBePreemptFromOther();
+      Resource actuallyToPreempt = apps[lPriority].getActuallyToBePreempted();
+      Resource delta = Resources.subtract(apps[lPriority].toBePreempted,
+          actuallyToPreempt);
+
+      if (Resources.greaterThan(rc, cluster, delta, Resources.none())) {
+        Resource toPreempt = Resources.min(rc, cluster,
+            toPreemptFromOther, delta);
+
+        apps[hPriority].setToBePreemptFromOther(
+            Resources.subtract(toPreemptFromOther, toPreempt));
+        apps[lPriority].setActuallyToBePreempted(
+            Resources.add(actuallyToPreempt, toPreempt));
+      }
 
 
-          // Check whether app with demand needs resource from other user.
-          if (Resources.greaterThan(rc, cluster, apps[lPriority].toBePreempted,
-              Resources.none())) {
+      if (Resources.lessThanOrEqual(rc, cluster,
+          apps[lPriority].toBePreempted,
+          apps[lPriority].getActuallyToBePreempted())) {
+        lPriority++;
+        continue;
+      }
 
 
-            // If apps are of same user, and priority is same, then skip.
-            if ((apps[hPriority].getUser().equals(apps[lPriority].getUser()))
-                && (apps[lPriority].getPriority() >= apps[hPriority]
-                    .getPriority())) {
-              continue;
-            }
-
-            if (Resources.lessThanOrEqual(rc, cluster,
-                apps[lPriority].toBePreempted,
-                apps[lPriority].getActuallyToBePreempted())
-                || Resources.equals(apps[hPriority].getToBePreemptFromOther(),
-                    Resources.none())) {
-              continue;
-            }
-
-            // Ideally if any application has a higher priority, then it can
-            // force to preempt any lower priority app from any user. However
-            // if admin enforces user-limit over priority, preemption module
-            // will not choose lower priority apps from usre's who are not yet
-            // met its user-limit.
-            TempUserPerPartition tmpUser = usersPerPartition
-                .get(apps[lPriority].getUser());
-            if ((!apps[hPriority].getUser().equals(apps[lPriority].getUser()))
-                && (!tmpUser.isUserLimitReached(rc, cluster))
-                && (intraQueuePreemptionOrder
-                    .equals(IntraQueuePreemptionOrderPolicy.USERLIMIT_FIRST))) {
-              continue;
-            }
-
-            Resource toPreemptFromOther = apps[hPriority]
-                .getToBePreemptFromOther();
-            Resource actuallyToPreempt = apps[lPriority]
-                .getActuallyToBePreempted();
-
-            // A lower priority app could offer more resource to preempt, if
-            // multiple higher priority/under served users needs resources.
-            // After one iteration, we need to ensure that actuallyToPreempt is
-            // subtracted from the resource to preempt.
-            Resource preemptableFromLowerPriorityApp = Resources
-                .subtract(apps[lPriority].toBePreempted, actuallyToPreempt);
-
-            // In case of user-limit preemption, when app's are from different
-            // user and of same priority, we will do user-limit preemption if
-            // there is a demand from under UL quota app.
-            // However this under UL quota app's demand may be more.
-            // Still we should ensure that we are not doing over preemption such
-            // that only a maximum of (user's used - UL quota) could be
-            // preempted.
-            if ((!apps[hPriority].getUser().equals(apps[lPriority].getUser()))
-                && (apps[lPriority].getPriority() == apps[hPriority]
-                    .getPriority())
-                && tmpUser.isUserLimitReached(rc, cluster)) {
-
-              Resource deltaULQuota = Resources
-                  .subtract(tmpUser.getUsedDeductAM(), tmpUser.selected);
-              Resources.subtractFrom(deltaULQuota, tmpUser.getUserLimit());
-
-              if (tmpUser.isPreemptionQuotaForULDeltaDone()) {
-                deltaULQuota = Resources.createResource(0, 0);
-              }
-
-              if (Resources.lessThan(rc, cluster, deltaULQuota,
-                  preemptableFromLowerPriorityApp)) {
-                tmpUser.updatePreemptionQuotaForULDeltaAsDone(true);
-                preemptableFromLowerPriorityApp = deltaULQuota;
-              }
-            }
-
-            if (Resources.greaterThan(rc, cluster,
-                preemptableFromLowerPriorityApp, Resources.none())) {
-              Resource toPreempt = Resources.min(rc, cluster,
-                  toPreemptFromOther, preemptableFromLowerPriorityApp);
-
-              apps[hPriority].setToBePreemptFromOther(
-                  Resources.subtract(toPreemptFromOther, toPreempt));
-              apps[lPriority].setActuallyToBePreempted(
-                  Resources.add(actuallyToPreempt, toPreempt));
-            }
-          }
-        }
+      if (Resources.equals(apps[hPriority].getToBePreemptFromOther(),
+          Resources.none())) {
+        hPriority--;
+        continue;
       }
       }
     }
     }
   }
   }
@@ -557,40 +454,6 @@ public class FifoIntraQueuePreemptionPlugin
       Resources.addTo(userAMResource, app.getAMResource(partition));
       Resources.addTo(userAMResource, app.getAMResource(partition));
       Resources.addTo(amUsed, app.getAMResource(partition));
       Resources.addTo(amUsed, app.getAMResource(partition));
     }
     }
-
     return amUsed;
     return amUsed;
   }
   }
-
-  @Override
-  public boolean skipContainerBasedOnIntraQueuePolicy(FiCaSchedulerApp app,
-      Resource clusterResource, Resource usedResource, RMContainer c) {
-    // Ensure below checks
-    // 1. This check must be done only when preemption order is USERLIMIT_FIRST
-    // 2. By selecting container "c", check whether this user's resource usage
-    // is going below its user-limit.
-    // 3. Used resource of user must be always greater than user-limit to
-    // skip some containers as per this check. If used resource is under user
-    // limit, then these containers of this user has to be preempted as demand
-    // might be due to high priority apps running in same user.
-    String partition = context.getScheduler()
-        .getSchedulerNode(c.getAllocatedNode()).getPartition();
-    TempQueuePerPartition tq = context.getQueueByPartition(app.getQueueName(),
-        partition);
-    TempUserPerPartition tmpUser = tq.getUsersPerPartition().get(app.getUser());
-
-    // Given user is not present, skip the check.
-    if (tmpUser == null) {
-      return false;
-    }
-
-    // For ideal resource computations, user-limit got saved by subtracting am
-    // used resource in TempUser. Hence it has to be added back here for
-    // complete check.
-    Resource userLimit = Resources.add(tmpUser.getUserLimit(), tmpUser.amUsed);
-
-    return Resources.lessThanOrEqual(rc, clusterResource,
-        Resources.subtract(usedResource, c.getAllocatedResource()), userLimit)
-        && context.getIntraQueuePreemptionOrderPolicy()
-            .equals(IntraQueuePreemptionOrderPolicy.USERLIMIT_FIRST);
-  }
 }
 }

+ 34 - 78
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/IntraQueueCandidatesSelector.java

@@ -23,7 +23,6 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.IntraQueuePreemptionOrderPolicy;
 import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
 import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
@@ -32,13 +31,11 @@ import org.apache.hadoop.yarn.util.resource.Resources;
 
 
 import java.io.Serializable;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.Comparator;
 import java.util.Comparator;
-import java.util.HashMap;
+import java.util.Iterator;
 import java.util.LinkedHashSet;
 import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.List;
 import java.util.Map;
 import java.util.Map;
-import java.util.Map.Entry;
 import java.util.Set;
 import java.util.Set;
 
 
 /**
 /**
@@ -54,14 +51,14 @@ public class IntraQueueCandidatesSelector extends PreemptionCandidatesSelector {
         Comparator<TempAppPerPartition> {
         Comparator<TempAppPerPartition> {
 
 
     @Override
     @Override
-    public int compare(TempAppPerPartition ta1, TempAppPerPartition ta2) {
-      Priority p1 = Priority.newInstance(ta1.getPriority());
-      Priority p2 = Priority.newInstance(ta2.getPriority());
+    public int compare(TempAppPerPartition tq1, TempAppPerPartition tq2) {
+      Priority p1 = Priority.newInstance(tq1.getPriority());
+      Priority p2 = Priority.newInstance(tq2.getPriority());
 
 
       if (!p1.equals(p2)) {
       if (!p1.equals(p2)) {
         return p1.compareTo(p2);
         return p1.compareTo(p2);
       }
       }
-      return ta1.getApplicationId().compareTo(ta2.getApplicationId());
+      return tq1.getApplicationId().compareTo(tq2.getApplicationId());
     }
     }
   }
   }
 
 
@@ -124,60 +121,37 @@ public class IntraQueueCandidatesSelector extends PreemptionCandidatesSelector {
         Map<String, Resource> resToObtainByPartition = fifoPreemptionComputePlugin
         Map<String, Resource> resToObtainByPartition = fifoPreemptionComputePlugin
             .getResourceDemandFromAppsPerQueue(queueName, partition);
             .getResourceDemandFromAppsPerQueue(queueName, partition);
 
 
-        // Default preemption iterator considers only FIFO+priority. For
-        // userlimit preemption, its possible that some lower priority apps
-        // needs from high priority app of another user. Hence use apps
-        // ordered by userlimit starvation as well.
-        Collection<FiCaSchedulerApp> apps = fifoPreemptionComputePlugin
-            .getPreemptableApps(queueName, partition);
-
-        // 6. Get user-limit to ensure that we do not preempt resources which
-        // will force user's resource to come under its UL.
-        Map<String, Resource> rollingResourceUsagePerUser = new HashMap<>();
-        initializeUsageAndUserLimitForCompute(clusterResource, partition,
-            leafQueue, rollingResourceUsagePerUser);
-
-        // 7. Based on the selected resource demand per partition, select
+        // 6. Based on the selected resource demand per partition, select
         // containers with known policy from inter-queue preemption.
         // containers with known policy from inter-queue preemption.
         try {
         try {
           leafQueue.getReadLock().lock();
           leafQueue.getReadLock().lock();
-          for (FiCaSchedulerApp app : apps) {
-            preemptFromLeastStarvedApp(leafQueue, app, selectedCandidates,
-                clusterResource, totalPreemptedResourceAllowed,
-                resToObtainByPartition, rollingResourceUsagePerUser);
+          Iterator<FiCaSchedulerApp> desc = leafQueue.getOrderingPolicy()
+              .getPreemptionIterator();
+          while (desc.hasNext()) {
+            FiCaSchedulerApp app = desc.next();
+            preemptFromLeastStarvedApp(selectedCandidates, clusterResource,
+                totalPreemptedResourceAllowed, resToObtainByPartition,
+                leafQueue, app);
           }
           }
         } finally {
         } finally {
           leafQueue.getReadLock().unlock();
           leafQueue.getReadLock().unlock();
         }
         }
       }
       }
     }
     }
-    return selectedCandidates;
-  }
 
 
-  private void initializeUsageAndUserLimitForCompute(Resource clusterResource,
-      String partition, LeafQueue leafQueue,
-      Map<String, Resource> rollingResourceUsagePerUser) {
-    for (String user : leafQueue.getAllUsers()) {
-      // Initialize used resource of a given user for rolling computation.
-      rollingResourceUsagePerUser.put(user, Resources.clone(
-          leafQueue.getUser(user).getResourceUsage().getUsed(partition)));
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Rolling resource usage for user:" + user + " is : "
-            + rollingResourceUsagePerUser.get(user));
-      }
-    }
+    return selectedCandidates;
   }
   }
 
 
-  private void preemptFromLeastStarvedApp(LeafQueue leafQueue,
-      FiCaSchedulerApp app,
+  private void preemptFromLeastStarvedApp(
       Map<ApplicationAttemptId, Set<RMContainer>> selectedCandidates,
       Map<ApplicationAttemptId, Set<RMContainer>> selectedCandidates,
       Resource clusterResource, Resource totalPreemptedResourceAllowed,
       Resource clusterResource, Resource totalPreemptedResourceAllowed,
-      Map<String, Resource> resToObtainByPartition,
-      Map<String, Resource> rollingResourceUsagePerUser) {
+      Map<String, Resource> resToObtainByPartition, LeafQueue leafQueue,
+      FiCaSchedulerApp app) {
 
 
     // ToDo: Reuse reservation selector here.
     // ToDo: Reuse reservation selector here.
 
 
-    List<RMContainer> liveContainers = new ArrayList<>(app.getLiveContainers());
+    List<RMContainer> liveContainers = new ArrayList<>(
+        app.getLiveContainers());
     sortContainers(liveContainers);
     sortContainers(liveContainers);
 
 
     if (LOG.isDebugEnabled()) {
     if (LOG.isDebugEnabled()) {
@@ -186,8 +160,6 @@ public class IntraQueueCandidatesSelector extends PreemptionCandidatesSelector {
               + totalPreemptedResourceAllowed);
               + totalPreemptedResourceAllowed);
     }
     }
 
 
-    Resource rollingUsedResourcePerUser = rollingResourceUsagePerUser
-        .get(app.getUser());
     for (RMContainer c : liveContainers) {
     for (RMContainer c : liveContainers) {
 
 
       // if there are no demand, return.
       // if there are no demand, return.
@@ -212,34 +184,12 @@ public class IntraQueueCandidatesSelector extends PreemptionCandidatesSelector {
         continue;
         continue;
       }
       }
 
 
-      // If selected container brings down resource usage under its user's
-      // UserLimit (or equals to), we must skip such containers.
-      if (fifoPreemptionComputePlugin.skipContainerBasedOnIntraQueuePolicy(app,
-          clusterResource, rollingUsedResourcePerUser, c)) {
-        if (LOG.isDebugEnabled()) {
-          LOG.debug(
-              "Skipping container: " + c.getContainerId() + " with resource:"
-                  + c.getAllocatedResource() + " as UserLimit for user:"
-                  + app.getUser() + " with resource usage: "
-                  + rollingUsedResourcePerUser + " is going under UL");
-        }
-        break;
-      }
-
       // Try to preempt this container
       // Try to preempt this container
-      boolean ret = CapacitySchedulerPreemptionUtils
-          .tryPreemptContainerAndDeductResToObtain(rc, preemptionContext,
-              resToObtainByPartition, c, clusterResource, selectedCandidates,
-              totalPreemptedResourceAllowed);
-
-      // Subtract from respective user's resource usage once a container is
-      // selected for preemption.
-      if (ret && preemptionContext.getIntraQueuePreemptionOrderPolicy()
-          .equals(IntraQueuePreemptionOrderPolicy.USERLIMIT_FIRST)) {
-        Resources.subtractFrom(rollingUsedResourcePerUser,
-            c.getAllocatedResource());
-      }
+      CapacitySchedulerPreemptionUtils.tryPreemptContainerAndDeductResToObtain(
+          rc, preemptionContext, resToObtainByPartition, c, clusterResource,
+          selectedCandidates, totalPreemptedResourceAllowed);
     }
     }
+
   }
   }
 
 
   private void computeIntraQueuePreemptionDemand(Resource clusterResource,
   private void computeIntraQueuePreemptionDemand(Resource clusterResource,
@@ -255,7 +205,12 @@ public class IntraQueueCandidatesSelector extends PreemptionCandidatesSelector {
         continue;
         continue;
       }
       }
 
 
-      // 2. loop through all queues corresponding to a partition.
+      // 2. Its better to get partition based resource limit earlier before
+      // starting calculation
+      Resource partitionBasedResource =
+          context.getPartitionResource(partition);
+
+      // 3. loop through all queues corresponding to a partition.
       for (String queueName : queueNames) {
       for (String queueName : queueNames) {
         TempQueuePerPartition tq = context.getQueueByPartition(queueName,
         TempQueuePerPartition tq = context.getQueueByPartition(queueName,
             partition);
             partition);
@@ -266,22 +221,23 @@ public class IntraQueueCandidatesSelector extends PreemptionCandidatesSelector {
           continue;
           continue;
         }
         }
 
 
-        // 3. Consider reassignableResource as (used - actuallyToBePreempted).
+        // 4. Consider reassignableResource as (used - actuallyToBePreempted).
         // This provides as upper limit to split apps quota in a queue.
         // This provides as upper limit to split apps quota in a queue.
         Resource queueReassignableResource = Resources.subtract(tq.getUsed(),
         Resource queueReassignableResource = Resources.subtract(tq.getUsed(),
             tq.getActuallyToBePreempted());
             tq.getActuallyToBePreempted());
 
 
-        // 4. Check queue's used capacity. Make sure that the used capacity is
+        // 5. Check queue's used capacity. Make sure that the used capacity is
         // above certain limit to consider for intra queue preemption.
         // above certain limit to consider for intra queue preemption.
         if (leafQueue.getQueueCapacities().getUsedCapacity(partition) < context
         if (leafQueue.getQueueCapacities().getUsedCapacity(partition) < context
             .getMinimumThresholdForIntraQueuePreemption()) {
             .getMinimumThresholdForIntraQueuePreemption()) {
           continue;
           continue;
         }
         }
 
 
-        // 5. compute the allocation of all apps based on queue's unallocated
+        // 6. compute the allocation of all apps based on queue's unallocated
         // capacity
         // capacity
         fifoPreemptionComputePlugin.computeAppsIdealAllocation(clusterResource,
         fifoPreemptionComputePlugin.computeAppsIdealAllocation(clusterResource,
-            tq, selectedCandidates, totalPreemptedResourceAllowed,
+            partitionBasedResource, tq, selectedCandidates,
+            totalPreemptedResourceAllowed,
             queueReassignableResource,
             queueReassignableResource,
             context.getMaxAllowableLimitForIntraQueuePreemption());
             context.getMaxAllowableLimitForIntraQueuePreemption());
       }
       }

+ 1 - 9
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/IntraQueuePreemptionComputePlugin.java

@@ -18,14 +18,12 @@
 
 
 package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity;
 package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity;
 
 
-import java.util.Collection;
 import java.util.Map;
 import java.util.Map;
 import java.util.Set;
 import java.util.Set;
 
 
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
 
 
 
 
 interface IntraQueuePreemptionComputePlugin {
 interface IntraQueuePreemptionComputePlugin {
@@ -34,14 +32,8 @@ interface IntraQueuePreemptionComputePlugin {
       String partition);
       String partition);
 
 
   void computeAppsIdealAllocation(Resource clusterResource,
   void computeAppsIdealAllocation(Resource clusterResource,
-      TempQueuePerPartition tq,
+      Resource partitionBasedResource, TempQueuePerPartition tq,
       Map<ApplicationAttemptId, Set<RMContainer>> selectedCandidates,
       Map<ApplicationAttemptId, Set<RMContainer>> selectedCandidates,
       Resource totalPreemptedResourceAllowed, Resource queueTotalUnassigned,
       Resource totalPreemptedResourceAllowed, Resource queueTotalUnassigned,
       float maxAllowablePreemptLimit);
       float maxAllowablePreemptLimit);
-
-  Collection<FiCaSchedulerApp> getPreemptableApps(String queueName,
-      String partition);
-
-  boolean skipContainerBasedOnIntraQueuePolicy(FiCaSchedulerApp app,
-      Resource clusterResource, Resource usedResource, RMContainer c);
 }
 }

+ 1 - 24
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java

@@ -22,7 +22,6 @@ import com.google.common.collect.ImmutableSet;
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
@@ -81,16 +80,6 @@ import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
  */
  */
 public class ProportionalCapacityPreemptionPolicy
 public class ProportionalCapacityPreemptionPolicy
     implements SchedulingEditPolicy, CapacitySchedulerPreemptionContext {
     implements SchedulingEditPolicy, CapacitySchedulerPreemptionContext {
-
-  /**
-   * IntraQueuePreemptionOrder will be used to define various priority orders
-   * which could be configured by admin.
-   */
-  @Unstable
-  public enum IntraQueuePreemptionOrderPolicy {
-    PRIORITY_FIRST, USERLIMIT_FIRST;
-  }
-
   private static final Log LOG =
   private static final Log LOG =
     LogFactory.getLog(ProportionalCapacityPreemptionPolicy.class);
     LogFactory.getLog(ProportionalCapacityPreemptionPolicy.class);
 
 
@@ -107,7 +96,6 @@ public class ProportionalCapacityPreemptionPolicy
 
 
   private float maxAllowableLimitForIntraQueuePreemption;
   private float maxAllowableLimitForIntraQueuePreemption;
   private float minimumThresholdForIntraQueuePreemption;
   private float minimumThresholdForIntraQueuePreemption;
-  private IntraQueuePreemptionOrderPolicy intraQueuePreemptionOrderPolicy;
 
 
   // Pointer to other RM components
   // Pointer to other RM components
   private RMContext rmContext;
   private RMContext rmContext;
@@ -203,13 +191,6 @@ public class ProportionalCapacityPreemptionPolicy
         CapacitySchedulerConfiguration.
         CapacitySchedulerConfiguration.
         DEFAULT_INTRAQUEUE_PREEMPTION_MINIMUM_THRESHOLD);
         DEFAULT_INTRAQUEUE_PREEMPTION_MINIMUM_THRESHOLD);
 
 
-    intraQueuePreemptionOrderPolicy = IntraQueuePreemptionOrderPolicy
-        .valueOf(csConfig
-            .get(
-                CapacitySchedulerConfiguration.INTRAQUEUE_PREEMPTION_ORDER_POLICY,
-                CapacitySchedulerConfiguration.DEFAULT_INTRAQUEUE_PREEMPTION_ORDER_POLICY)
-            .toUpperCase());
-
     rc = scheduler.getResourceCalculator();
     rc = scheduler.getResourceCalculator();
     nlm = scheduler.getRMContext().getNodeLabelManager();
     nlm = scheduler.getRMContext().getNodeLabelManager();
 
 
@@ -262,6 +243,7 @@ public class ProportionalCapacityPreemptionPolicy
     }
     }
   }
   }
 
 
+  @SuppressWarnings("unchecked")
   private void preemptOrkillSelectedContainerAfterWait(
   private void preemptOrkillSelectedContainerAfterWait(
       Map<ApplicationAttemptId, Set<RMContainer>> selectedCandidates,
       Map<ApplicationAttemptId, Set<RMContainer>> selectedCandidates,
       long currentTime) {
       long currentTime) {
@@ -674,9 +656,4 @@ public class ProportionalCapacityPreemptionPolicy
     }
     }
     underServedQueues.add(queueName);
     underServedQueues.add(queueName);
   }
   }
-
-  @Override
-  public IntraQueuePreemptionOrderPolicy getIntraQueuePreemptionOrderPolicy() {
-    return intraQueuePreemptionOrderPolicy;
-  }
 }
 }

+ 8 - 16
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempAppPerPartition.java

@@ -59,17 +59,13 @@ public class TempAppPerPartition extends AbstractPreemptionEntity {
   @Override
   @Override
   public String toString() {
   public String toString() {
     StringBuilder sb = new StringBuilder();
     StringBuilder sb = new StringBuilder();
-    sb.append("NAME: " + getApplicationId())
-        .append("  PRIO: ").append(priority)
-        .append("  CUR: ").append(getUsed())
-        .append("  PEN: ").append(pending)
-        .append("  RESERVED: ").append(reserved)
-        .append("  IDEAL_ASSIGNED: ").append(idealAssigned)
-        .append("  PREEMPT_OTHER: ").append(getToBePreemptFromOther())
-        .append("  IDEAL_PREEMPT: ").append(toBePreempted)
-        .append("  ACTUAL_PREEMPT: ").append(getActuallyToBePreempted())
-        .append("  SELECTED: ").append(selected)
-        .append("\n");
+    sb.append(" NAME: " + getApplicationId()).append(" PRIO: ").append(priority)
+        .append(" CUR: ").append(getUsed()).append(" PEN: ").append(pending)
+        .append(" RESERVED: ").append(reserved).append(" IDEAL_ASSIGNED: ")
+        .append(idealAssigned).append(" PREEMPT_OTHER: ")
+        .append(getToBePreemptFromOther()).append(" IDEAL_PREEMPT: ")
+        .append(toBePreempted).append(" ACTUAL_PREEMPT: ")
+        .append(getActuallyToBePreempted()).append("\n");
 
 
     return sb.toString();
     return sb.toString();
   }
   }
@@ -95,12 +91,8 @@ public class TempAppPerPartition extends AbstractPreemptionEntity {
     return applicationId;
     return applicationId;
   }
   }
 
 
-  public String getUser() {
-    return this.app.getUser();
-  }
-
   public void deductActuallyToBePreempted(ResourceCalculator resourceCalculator,
   public void deductActuallyToBePreempted(ResourceCalculator resourceCalculator,
-      Resource cluster, Resource toBeDeduct) {
+      Resource cluster, Resource toBeDeduct, String partition) {
     if (Resources.greaterThan(resourceCalculator, cluster,
     if (Resources.greaterThan(resourceCalculator, cluster,
         getActuallyToBePreempted(), toBeDeduct)) {
         getActuallyToBePreempted(), toBeDeduct)) {
       Resources.subtractFrom(getActuallyToBePreempted(), toBeDeduct);
       Resources.subtractFrom(getActuallyToBePreempted(), toBeDeduct);

+ 0 - 14
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempQueuePerPartition.java

@@ -26,8 +26,6 @@ import org.apache.hadoop.yarn.util.resource.Resources;
 
 
 import java.util.ArrayList;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collection;
-import java.util.LinkedHashMap;
-import java.util.Map;
 
 
 /**
 /**
  * Temporary data-structure tracking resource availability, pending resource
  * Temporary data-structure tracking resource availability, pending resource
@@ -61,10 +59,6 @@ public class TempQueuePerPartition extends AbstractPreemptionEntity {
   int relativePriority = 0;
   int relativePriority = 0;
   TempQueuePerPartition parent = null;
   TempQueuePerPartition parent = null;
 
 
-  // This will hold a temp user data structure and will hold userlimit,
-  // idealAssigned, used etc.
-  Map<String, TempUserPerPartition> usersPerPartition = new LinkedHashMap<>();
-
   TempQueuePerPartition(String queueName, Resource current,
   TempQueuePerPartition(String queueName, Resource current,
       boolean preemptionDisabled, String partition, Resource killable,
       boolean preemptionDisabled, String partition, Resource killable,
       float absCapacity, float absMaxCapacity, Resource totalPartitionResource,
       float absCapacity, float absMaxCapacity, Resource totalPartitionResource,
@@ -295,12 +289,4 @@ public class TempQueuePerPartition extends AbstractPreemptionEntity {
     return apps;
     return apps;
   }
   }
 
 
-  public void addUserPerPartition(String userName,
-      TempUserPerPartition tmpUser) {
-    this.usersPerPartition.put(userName, tmpUser);
-  }
-
-  public Map<String, TempUserPerPartition> getUsersPerPartition() {
-    return usersPerPartition;
-  }
 }
 }

+ 0 - 88
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempUserPerPartition.java

@@ -1,88 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity;
-
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue.User;
-import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
-import org.apache.hadoop.yarn.util.resource.Resources;
-
-
-/**
- * Temporary data-structure tracking resource availability, pending resource
- * need, current utilization for an application.
- */
-public class TempUserPerPartition extends AbstractPreemptionEntity {
-
-  private final User user;
-  private Resource userLimit;
-  private boolean donePreemptionQuotaForULDelta = false;
-
-  TempUserPerPartition(User user, String queueName, Resource usedPerPartition,
-      Resource amUsedPerPartition, Resource reserved,
-      Resource pendingPerPartition) {
-    super(queueName, usedPerPartition, amUsedPerPartition, reserved,
-        pendingPerPartition);
-    this.user = user;
-  }
-
-  @Override
-  public String toString() {
-    StringBuilder sb = new StringBuilder();
-    sb.append(" NAME: " + getUserName()).append(" CUR: ").append(getUsed())
-        .append(" PEN: ").append(pending).append(" RESERVED: ").append(reserved)
-        .append(" AM_USED: ").append(amUsed).append(" USER_LIMIT: ")
-        .append(getUserLimit()).append(" IDEAL_ASSIGNED: ")
-        .append(idealAssigned).append(" USED_WO_AMUSED: ")
-        .append(getUsedDeductAM()).append(" IDEAL_PREEMPT: ")
-        .append(toBePreempted).append(" ACTUAL_PREEMPT: ")
-        .append(getActuallyToBePreempted()).append("\n");
-
-    return sb.toString();
-  }
-
-  public String getUserName() {
-    return user.getUserName();
-  }
-
-  public Resource getUserLimit() {
-    return userLimit;
-  }
-
-  public void setUserLimit(Resource userLimitResource) {
-    this.userLimit = userLimitResource;
-  }
-
-  public boolean isUserLimitReached(ResourceCalculator rc,
-      Resource clusterResource) {
-    if (Resources.greaterThan(rc, clusterResource, getUsedDeductAM(),
-        userLimit)) {
-      return true;
-    }
-    return false;
-  }
-
-  public boolean isPreemptionQuotaForULDeltaDone() {
-    return this.donePreemptionQuotaForULDelta;
-  }
-
-  public void updatePreemptionQuotaForULDeltaAsDone(boolean done) {
-    this.donePreemptionQuotaForULDelta = done;
-  }
-}

+ 0 - 8
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java

@@ -1233,14 +1233,6 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
   public static final float DEFAULT_INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT =
   public static final float DEFAULT_INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT =
       0.2f;
       0.2f;
 
 
-   /**
-   * For intra-queue preemption, enforce a preemption order such as
-   * "userlimit_first" or "priority_first".
-   */
-  public static final String INTRAQUEUE_PREEMPTION_ORDER_POLICY = PREEMPTION_CONFIG_PREFIX
-      + INTRA_QUEUE_PREEMPTION_CONFIG_PREFIX + "preemption-order-policy";
-  public static final String DEFAULT_INTRAQUEUE_PREEMPTION_ORDER_POLICY = "userlimit_first";
-
   /**
   /**
    * Maximum application for a queue to be used when application per queue is
    * Maximum application for a queue to be used when application per queue is
    * not defined.To be consistent with previous version the default value is set
    * not defined.To be consistent with previous version the default value is set

+ 13 - 54
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java

@@ -486,7 +486,7 @@ public class LeafQueue extends AbstractCSQueue {
       writeLock.lock();
       writeLock.lock();
       User u = users.get(userName);
       User u = users.get(userName);
       if (null == u) {
       if (null == u) {
-        u = new User(userName);
+        u = new User();
         users.put(userName, u);
         users.put(userName, u);
       }
       }
       return u;
       return u;
@@ -1292,7 +1292,7 @@ public class LeafQueue extends AbstractCSQueue {
       String partition) {
       String partition) {
     return getHeadroom(user, queueCurrentLimit, clusterResource,
     return getHeadroom(user, queueCurrentLimit, clusterResource,
         computeUserLimit(application.getUser(), clusterResource, user,
         computeUserLimit(application.getUser(), clusterResource, user,
-            partition, SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY, true),
+            partition, SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY),
         partition);
         partition);
   }
   }
 
 
@@ -1366,7 +1366,7 @@ public class LeafQueue extends AbstractCSQueue {
     // TODO, need consider headroom respect labels also
     // TODO, need consider headroom respect labels also
     Resource userLimit =
     Resource userLimit =
         computeUserLimit(application.getUser(), clusterResource, queueUser,
         computeUserLimit(application.getUser(), clusterResource, queueUser,
-            nodePartition, schedulingMode, true);
+            nodePartition, schedulingMode);
 
 
     setQueueResourceLimitsInfo(clusterResource);
     setQueueResourceLimitsInfo(clusterResource);
 
 
@@ -1410,7 +1410,7 @@ public class LeafQueue extends AbstractCSQueue {
   @Lock(NoLock.class)
   @Lock(NoLock.class)
   private Resource computeUserLimit(String userName,
   private Resource computeUserLimit(String userName,
       Resource clusterResource, User user,
       Resource clusterResource, User user,
-      String nodePartition, SchedulingMode schedulingMode, boolean forActive) {
+      String nodePartition, SchedulingMode schedulingMode) {
     Resource partitionResource = labelManager.getResourceByLabel(nodePartition,
     Resource partitionResource = labelManager.getResourceByLabel(nodePartition,
         clusterResource);
         clusterResource);
 
 
@@ -1462,21 +1462,16 @@ public class LeafQueue extends AbstractCSQueue {
     // queue's configured capacity * user-limit-factor.
     // queue's configured capacity * user-limit-factor.
     // Also, the queue's configured capacity should be higher than 
     // Also, the queue's configured capacity should be higher than 
     // queue-hard-limit * ulMin
     // queue-hard-limit * ulMin
-
-    final int usersCount;
-    if (forActive) {
-      usersCount = activeUsersManager.getNumActiveUsers();
-    } else {
-      usersCount = users.size();
-    }
-
+    
+    final int activeUsers = activeUsersManager.getNumActiveUsers();
+    
     // User limit resource is determined by:
     // User limit resource is determined by:
     // max{currentCapacity / #activeUsers, currentCapacity *
     // max{currentCapacity / #activeUsers, currentCapacity *
     // user-limit-percentage%)
     // user-limit-percentage%)
     Resource userLimitResource = Resources.max(
     Resource userLimitResource = Resources.max(
         resourceCalculator, partitionResource,
         resourceCalculator, partitionResource,
         Resources.divideAndCeil(
         Resources.divideAndCeil(
-            resourceCalculator, currentCapacity, usersCount),
+            resourceCalculator, currentCapacity, activeUsers),
         Resources.divideAndCeil(
         Resources.divideAndCeil(
             resourceCalculator, 
             resourceCalculator, 
             Resources.multiplyAndRoundDown(
             Resources.multiplyAndRoundDown(
@@ -1524,16 +1519,14 @@ public class LeafQueue extends AbstractCSQueue {
           " qconsumed: " + queueUsage.getUsed() +
           " qconsumed: " + queueUsage.getUsed() +
           " consumedRatio: " + totalUserConsumedRatio +
           " consumedRatio: " + totalUserConsumedRatio +
           " currentCapacity: " + currentCapacity +
           " currentCapacity: " + currentCapacity +
-          " activeUsers: " + usersCount +
+          " activeUsers: " + activeUsers +
           " clusterCapacity: " + clusterResource +
           " clusterCapacity: " + clusterResource +
           " resourceByLabel: " + partitionResource +
           " resourceByLabel: " + partitionResource +
           " usageratio: " + qUsageRatios.getUsageRatio(nodePartition) +
           " usageratio: " + qUsageRatios.getUsageRatio(nodePartition) +
           " Partition: " + nodePartition
           " Partition: " + nodePartition
       );
       );
     }
     }
-    if (forActive) {
-      user.setUserResourceLimit(userLimitResource);
-    }
+    user.setUserResourceLimit(userLimitResource);
     return userLimitResource;
     return userLimitResource;
   }
   }
   
   
@@ -1962,14 +1955,11 @@ public class LeafQueue extends AbstractCSQueue {
     volatile int activeApplications = 0;
     volatile int activeApplications = 0;
     private UsageRatios userUsageRatios = new UsageRatios();
     private UsageRatios userUsageRatios = new UsageRatios();
     private WriteLock writeLock;
     private WriteLock writeLock;
-    String userName;
 
 
-    @VisibleForTesting
-    public User(String name) {
+    User() {
       ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
       ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
       // Nobody uses read-lock now, will add it when necessary
       // Nobody uses read-lock now, will add it when necessary
       writeLock = lock.writeLock();
       writeLock = lock.writeLock();
-      this.userName = name;
     }
     }
 
 
     public ResourceUsage getResourceUsage() {
     public ResourceUsage getResourceUsage() {
@@ -2083,15 +2073,6 @@ public class LeafQueue extends AbstractCSQueue {
     public void setUserResourceLimit(Resource userResourceLimit) {
     public void setUserResourceLimit(Resource userResourceLimit) {
       this.userResourceLimit = userResourceLimit;
       this.userResourceLimit = userResourceLimit;
     }
     }
-
-    public String getUserName() {
-      return this.userName;
-    }
-
-    @VisibleForTesting
-    public void setResourceUsage(ResourceUsage resourceUsage) {
-      this.userResourceUsage = resourceUsage;
-    }
   }
   }
 
 
   @Override
   @Override
@@ -2177,7 +2158,7 @@ public class LeafQueue extends AbstractCSQueue {
           User user = getUser(userName);
           User user = getUser(userName);
           Resource headroom = Resources.subtract(
           Resource headroom = Resources.subtract(
               computeUserLimit(app.getUser(), clusterResources, user, partition,
               computeUserLimit(app.getUser(), clusterResources, user, partition,
-                  SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY, true),
+                  SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY),
               user.getUsed(partition));
               user.getUsed(partition));
           // Make sure headroom is not negative.
           // Make sure headroom is not negative.
           headroom = Resources.componentwiseMax(headroom, Resources.none());
           headroom = Resources.componentwiseMax(headroom, Resources.none());
@@ -2214,7 +2195,7 @@ public class LeafQueue extends AbstractCSQueue {
     User user = getUser(userName);
     User user = getUser(userName);
 
 
     return computeUserLimit(userName, resources, user, partition,
     return computeUserLimit(userName, resources, user, partition,
-        SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY, true);
+        SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
   }
   }
 
 
   @Override
   @Override
@@ -2396,26 +2377,4 @@ public class LeafQueue extends AbstractCSQueue {
       writeLock.unlock();
       writeLock.unlock();
     }
     }
   }
   }
-
-  /**
-   * Get all valid users in this queue.
-   * @return user list
-   */
-  public Set<String> getAllUsers() {
-    return this.users.keySet();
-  }
-
-  public Resource getResourceLimitForActiveUsers(String userName,
-      Resource clusterResource, String partition,
-      SchedulingMode schedulingMode) {
-    return computeUserLimit(userName, clusterResource, getUser(userName),
-        partition, schedulingMode, true);
-  }
-
-  public synchronized Resource getResourceLimitForAllUsers(String userName,
-      Resource clusterResource, String partition, SchedulingMode schedulingMode)
-  {
-    return computeUserLimit(userName, clusterResource, getUser(userName),
-        partition, schedulingMode, false);
-  }
 }
 }

+ 12 - 77
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicyMockFramework.java

@@ -42,10 +42,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue.User;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ParentQueue;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ParentQueue;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacities;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacities;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
@@ -71,6 +69,7 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.HashSet;
 import java.util.List;
 import java.util.List;
 import java.util.Map;
 import java.util.Map;
+import java.util.Set;
 import java.util.TreeSet;
 import java.util.TreeSet;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 
@@ -97,7 +96,6 @@ public class ProportionalCapacityPreemptionPolicyMockFramework {
   Clock mClock = null;
   Clock mClock = null;
   CapacitySchedulerConfiguration conf = null;
   CapacitySchedulerConfiguration conf = null;
   CapacityScheduler cs = null;
   CapacityScheduler cs = null;
-  @SuppressWarnings("rawtypes")
   EventHandler<SchedulerEvent> mDisp = null;
   EventHandler<SchedulerEvent> mDisp = null;
   ProportionalCapacityPreemptionPolicy policy = null;
   ProportionalCapacityPreemptionPolicy policy = null;
   Resource clusterResource = null;
   Resource clusterResource = null;
@@ -249,7 +247,6 @@ public class ProportionalCapacityPreemptionPolicyMockFramework {
         if (containerId == 1) {
         if (containerId == 1) {
           when(rmc.isAMContainer()).thenReturn(true);
           when(rmc.isAMContainer()).thenReturn(true);
           when(app.getAMResource(label)).thenReturn(res);
           when(app.getAMResource(label)).thenReturn(res);
-          when(app.getAppAMNodePartitionName()).thenReturn(label);
         }
         }
 
 
         if (reserved) {
         if (reserved) {
@@ -283,12 +280,6 @@ public class ProportionalCapacityPreemptionPolicyMockFramework {
         containerId++;
         containerId++;
       }
       }
 
 
-      // If app has 0 container, and it has only pending, still make sure to
-      // update label.
-      if (repeat == 0) {
-        when(app.getAppAMNodePartitionName()).thenReturn(label);
-      }
-
       // Some more app specific aggregated data can be better filled here.
       // Some more app specific aggregated data can be better filled here.
       when(app.getPriority()).thenReturn(pri);
       when(app.getPriority()).thenReturn(pri);
       when(app.getUser()).thenReturn(userName);
       when(app.getUser()).thenReturn(userName);
@@ -324,15 +315,10 @@ public class ProportionalCapacityPreemptionPolicyMockFramework {
   private void mockApplications(String appsConfig) {
   private void mockApplications(String appsConfig) {
     int id = 1;
     int id = 1;
     HashMap<String, HashSet<String>> userMap = new HashMap<String, HashSet<String>>();
     HashMap<String, HashSet<String>> userMap = new HashMap<String, HashSet<String>>();
-    HashMap<String, HashMap<String, HashMap<String, ResourceUsage>>> userResourceUsagePerLabel = new HashMap<>();
     LeafQueue queue = null;
     LeafQueue queue = null;
-    int mulp = -1;
     for (String a : appsConfig.split(";")) {
     for (String a : appsConfig.split(";")) {
       String[] strs = a.split("\t");
       String[] strs = a.split("\t");
       String queueName = strs[0];
       String queueName = strs[0];
-      if (mulp <= 0 && strs.length > 2 && strs[2] != null) {
-        mulp = 100 / (new Integer(strs[2]).intValue());
-      }
 
 
       // get containers
       // get containers
       List<RMContainer> liveContainers = new ArrayList<RMContainer>();
       List<RMContainer> liveContainers = new ArrayList<RMContainer>();
@@ -352,7 +338,6 @@ public class ProportionalCapacityPreemptionPolicyMockFramework {
       when(app.getReservedContainers()).thenReturn(reservedContainers);
       when(app.getReservedContainers()).thenReturn(reservedContainers);
       when(app.getApplicationAttemptId()).thenReturn(appAttemptId);
       when(app.getApplicationAttemptId()).thenReturn(appAttemptId);
       when(app.getApplicationId()).thenReturn(appId);
       when(app.getApplicationId()).thenReturn(appId);
-      when(app.getQueueName()).thenReturn(queueName);
 
 
       // add to LeafQueue
       // add to LeafQueue
       queue = (LeafQueue) nameToCSQueues.get(queueName);
       queue = (LeafQueue) nameToCSQueues.get(queueName);
@@ -366,70 +351,20 @@ public class ProportionalCapacityPreemptionPolicyMockFramework {
       }
       }
 
 
       users.add(app.getUser());
       users.add(app.getUser());
-
-      String label = app.getAppAMNodePartitionName();
-
-      // Get label to queue
-      HashMap<String, HashMap<String, ResourceUsage>> userResourceUsagePerQueue = userResourceUsagePerLabel
-          .get(label);
-      if (null == userResourceUsagePerQueue) {
-        userResourceUsagePerQueue = new HashMap<>();
-        userResourceUsagePerLabel.put(label, userResourceUsagePerQueue);
-      }
-
-      // Get queue to user based resource map
-      HashMap<String, ResourceUsage> userResourceUsage = userResourceUsagePerQueue
-          .get(queueName);
-      if (null == userResourceUsage) {
-        userResourceUsage = new HashMap<>();
-        userResourceUsagePerQueue.put(queueName, userResourceUsage);
-      }
-
-      // Get user to its resource usage.
-      ResourceUsage usage = userResourceUsage.get(app.getUser());
-      if (null == usage) {
-        usage = new ResourceUsage();
-        userResourceUsage.put(app.getUser(), usage);
-      }
-
-      usage.incAMUsed(app.getAMResource(label));
-      usage.incUsed(app.getAppAttemptResourceUsage().getUsed(label));
       id++;
       id++;
     }
     }
 
 
-    for (String label : userResourceUsagePerLabel.keySet()) {
-      for (String queueName : userMap.keySet()) {
-        queue = (LeafQueue) nameToCSQueues.get(queueName);
-        // Currently we have user-limit test support only for default label.
-        Resource totResoucePerPartition = partitionToResource.get("");
-        Resource capacity = Resources.multiply(totResoucePerPartition,
-            queue.getQueueCapacities().getAbsoluteCapacity());
-        HashSet<String> users = userMap.get(queue.getQueueName());
-        when(queue.getAllUsers()).thenReturn(users);
-        Resource userLimit;
-        if (mulp > 0) {
-          userLimit = Resources.divideAndCeil(rc, capacity, mulp);
-        } else {
-          userLimit = Resources.divideAndCeil(rc, capacity,
-              users.size());
-        }
-        LOG.debug("Updating user-limit from mock: totResoucePerPartition="
-            + totResoucePerPartition + ", capacity=" + capacity
-            + ", users.size()=" + users.size() + ", userlimit= " + userLimit
-            + ",label= " + label + ",queueName= " + queueName);
-
-        HashMap<String, ResourceUsage> userResourceUsage = userResourceUsagePerLabel
-            .get(label).get(queueName);
-        for (String userName : users) {
-          User user = new User(userName);
-          if (userResourceUsage != null) {
-            user.setResourceUsage(userResourceUsage.get(userName));
-          }
-          when(queue.getUser(eq(userName))).thenReturn(user);
-          when(queue.getResourceLimitForAllUsers(eq(userName),
-              any(Resource.class), anyString(), any(SchedulingMode.class)))
-                  .thenReturn(userLimit);
-        }
+    for (String queueName : userMap.keySet()) {
+      queue = (LeafQueue) nameToCSQueues.get(queueName);
+      // Currently we have user-limit test support only for default label.
+      Resource totResoucePerPartition = partitionToResource.get("");
+      Resource capacity = Resources.multiply(totResoucePerPartition,
+          queue.getQueueCapacities().getAbsoluteCapacity());
+      HashSet<String> users = userMap.get(queue.getQueueName());
+      Resource userLimit = Resources.divideAndCeil(rc, capacity, users.size());
+      for (String user : users) {
+        when(queue.getUserLimitPerUser(eq(user), any(Resource.class),
+            anyString())).thenReturn(userLimit);
       }
       }
     }
     }
   }
   }

+ 4 - 26
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyIntraQueue.java

@@ -62,16 +62,12 @@ public class TestProportionalCapacityPreemptionPolicyIntraQueue
      * Apps which are running at low priority (4) will preempt few of its
      * Apps which are running at low priority (4) will preempt few of its
      * resources to meet the demand.
      * resources to meet the demand.
      */
      */
-
-    conf.set(CapacitySchedulerConfiguration.INTRAQUEUE_PREEMPTION_ORDER_POLICY,
-        "priority_first");
-
     String labelsConfig = "=100,true;";
     String labelsConfig = "=100,true;";
     String nodesConfig = // n1 has no label
     String nodesConfig = // n1 has no label
         "n1= res=100";
         "n1= res=100";
     String queuesConfig =
     String queuesConfig =
         // guaranteed,max,used,pending,reserved
         // guaranteed,max,used,pending,reserved
-        "root(=[100 100 79 120 0]);" + // root
+        "root(=[100 100 80 120 0]);" + // root
             "-a(=[11 100 11 50 0]);" + // a
             "-a(=[11 100 11 50 0]);" + // a
             "-b(=[40 100 38 60 0]);" + // b
             "-b(=[40 100 38 60 0]);" + // b
             "-c(=[20 100 10 10 0]);" + // c
             "-c(=[20 100 10 10 0]);" + // c
@@ -308,8 +304,6 @@ public class TestProportionalCapacityPreemptionPolicyIntraQueue
     conf.setFloat(CapacitySchedulerConfiguration.
     conf.setFloat(CapacitySchedulerConfiguration.
         INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT,
         INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT,
         (float) 0.5);
         (float) 0.5);
-    conf.set(CapacitySchedulerConfiguration.INTRAQUEUE_PREEMPTION_ORDER_POLICY,
-        "priority_first");
 
 
     String labelsConfig = "=100,true;";
     String labelsConfig = "=100,true;";
     String nodesConfig = // n1 has no label
     String nodesConfig = // n1 has no label
@@ -363,8 +357,6 @@ public class TestProportionalCapacityPreemptionPolicyIntraQueue
     // report "ideal" preempt as 10%. Ensure preemption happens only for 10%
     // report "ideal" preempt as 10%. Ensure preemption happens only for 10%
     conf.setFloat(CapacitySchedulerConfiguration.TOTAL_PREEMPTION_PER_ROUND,
     conf.setFloat(CapacitySchedulerConfiguration.TOTAL_PREEMPTION_PER_ROUND,
         (float) 0.1);
         (float) 0.1);
-    conf.set(CapacitySchedulerConfiguration.INTRAQUEUE_PREEMPTION_ORDER_POLICY,
-        "priority_first");
 
 
     String labelsConfig = "=100,true;";
     String labelsConfig = "=100,true;";
     String nodesConfig = // n1 has no label
     String nodesConfig = // n1 has no label
@@ -419,8 +411,6 @@ public class TestProportionalCapacityPreemptionPolicyIntraQueue
     conf.setFloat(CapacitySchedulerConfiguration.
     conf.setFloat(CapacitySchedulerConfiguration.
         INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT,
         INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT,
         (float) 0.5);
         (float) 0.5);
-    conf.set(CapacitySchedulerConfiguration.INTRAQUEUE_PREEMPTION_ORDER_POLICY,
-        "priority_first");
 
 
     String labelsConfig = "=100,true;";
     String labelsConfig = "=100,true;";
     String nodesConfig = // n1 has no label
     String nodesConfig = // n1 has no label
@@ -428,7 +418,7 @@ public class TestProportionalCapacityPreemptionPolicyIntraQueue
     String queuesConfig =
     String queuesConfig =
         // guaranteed,max,used,pending,reserved
         // guaranteed,max,used,pending,reserved
         "root(=[100 100 95 170 0]);" + // root
         "root(=[100 100 95 170 0]);" + // root
-            "-a(=[60 100 70 35 0]);" + // a
+            "-a(=[60 100 70 50 0]);" + // a
             "-b(=[40 100 25 120 0])"; // b
             "-b(=[40 100 25 120 0])"; // b
 
 
     String appsConfig =
     String appsConfig =
@@ -477,8 +467,6 @@ public class TestProportionalCapacityPreemptionPolicyIntraQueue
     conf.setFloat(CapacitySchedulerConfiguration.
     conf.setFloat(CapacitySchedulerConfiguration.
         INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT,
         INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT,
         (float) 0.5);
         (float) 0.5);
-    conf.set(CapacitySchedulerConfiguration.INTRAQUEUE_PREEMPTION_ORDER_POLICY,
-        "priority_first");
 
 
     String labelsConfig = "=100,true;";
     String labelsConfig = "=100,true;";
     String nodesConfig = // n1 has no label
     String nodesConfig = // n1 has no label
@@ -528,8 +516,6 @@ public class TestProportionalCapacityPreemptionPolicyIntraQueue
      * cycle. Eventhough there are more demand and no other low priority
      * cycle. Eventhough there are more demand and no other low priority
      * apps are present, still AM contaier need to soared.
      * apps are present, still AM contaier need to soared.
      */
      */
-    conf.set(CapacitySchedulerConfiguration.INTRAQUEUE_PREEMPTION_ORDER_POLICY,
-        "priority_first");
 
 
     String labelsConfig = "=100,true;";
     String labelsConfig = "=100,true;";
     String nodesConfig = // n1 has no label
     String nodesConfig = // n1 has no label
@@ -674,8 +660,6 @@ public class TestProportionalCapacityPreemptionPolicyIntraQueue
     conf.setFloat(CapacitySchedulerConfiguration.
     conf.setFloat(CapacitySchedulerConfiguration.
         INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT,
         INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT,
         (float) 0.5);
         (float) 0.5);
-    conf.set(CapacitySchedulerConfiguration.INTRAQUEUE_PREEMPTION_ORDER_POLICY,
-        "priority_first");
 
 
     String labelsConfig = "=100,true;" + // default partition
     String labelsConfig = "=100,true;" + // default partition
         "x=100,true"; // partition=x
         "x=100,true"; // partition=x
@@ -736,8 +720,6 @@ public class TestProportionalCapacityPreemptionPolicyIntraQueue
     conf.setFloat(CapacitySchedulerConfiguration.
     conf.setFloat(CapacitySchedulerConfiguration.
         INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT,
         INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT,
         (float) 0.5);
         (float) 0.5);
-    conf.set(CapacitySchedulerConfiguration.INTRAQUEUE_PREEMPTION_ORDER_POLICY,
-        "priority_first");
 
 
     String labelsConfig = "=100,true;";
     String labelsConfig = "=100,true;";
     String nodesConfig = // n1 has no label
     String nodesConfig = // n1 has no label
@@ -858,10 +840,8 @@ public class TestProportionalCapacityPreemptionPolicyIntraQueue
     policy.editSchedule();
     policy.editSchedule();
 
 
     // Considering user-limit of 50% since only 2 users are there, only preempt
     // Considering user-limit of 50% since only 2 users are there, only preempt
-    // 14 more (5 is already running) eventhough demand is for 30. Ideally we
-    // must preempt 15. But 15th container will bring user1's usage to 20 which
-    // is same as user-limit. Hence skip 15th container.
-    verify(mDisp, times(14)).handle(argThat(
+    // 15 more (5 is already running) eventhough demand is for 30.
+    verify(mDisp, times(15)).handle(argThat(
         new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
         new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
             getAppAttemptId(3))));
             getAppAttemptId(3))));
   }
   }
@@ -889,8 +869,6 @@ public class TestProportionalCapacityPreemptionPolicyIntraQueue
     conf.setFloat(CapacitySchedulerConfiguration.
     conf.setFloat(CapacitySchedulerConfiguration.
         INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT,
         INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT,
         (float) 0.5);
         (float) 0.5);
-    conf.set(CapacitySchedulerConfiguration.INTRAQUEUE_PREEMPTION_ORDER_POLICY,
-        "priority_first");
 
 
     String labelsConfig = "=100,true;" + // default partition
     String labelsConfig = "=100,true;" + // default partition
         "x=100,true"; // partition=x
         "x=100,true"; // partition=x

+ 0 - 899
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyIntraQueueUserLimit.java

@@ -1,899 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity;
-
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.io.IOException;
-
-import static org.mockito.Matchers.argThat;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-
-/**
- * Test class for IntraQueuePreemption scenarios.
- */
-public class TestProportionalCapacityPreemptionPolicyIntraQueueUserLimit
-    extends
-      ProportionalCapacityPreemptionPolicyMockFramework {
-  @Before
-  public void setup() {
-    super.setup();
-    conf.setBoolean(
-        CapacitySchedulerConfiguration.INTRAQUEUE_PREEMPTION_ENABLED, true);
-    policy = new ProportionalCapacityPreemptionPolicy(rmContext, cs, mClock);
-  }
-
-  @Test
-  public void testSimpleIntraQueuePreemptionWithTwoUsers()
-      throws IOException {
-    /**
-     * Queue structure is:
-     *
-     * <pre>
-     *       root
-     *        |
-     *        a
-     * </pre>
-     *
-     * Scenario:
-     * Preconditions:
-     *   Queue total resources: 100
-     *   Minimum user limit percent: 50%
-     *   +--------------+----------+------+---------+
-     *   | APP  | USER  | PRIORITY | USED | PENDING |
-     *   +--------------+----------+------+---------+
-     *   | app1 | user1 | 1        | 100  | 0       |
-     *   | app2 | user2 | 1        | 0    | 30      |
-     *   +--------------+----------+------+---------+
-     * Hence in queueA of 100, each user has a quota of 50. app1 of high priority
-     * has a demand of 0 and its already using 100. app2 from user2 has a demand
-     * of 30, and UL is 50. 30 would be preempted from app1.
-     */
-
-    // Set max preemption limit as 50%.
-    conf.setFloat(CapacitySchedulerConfiguration.
-        INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT,
-        (float) 0.5);
-
-    String labelsConfig = "=100,true;";
-    String nodesConfig = // n1 has no label
-        "n1= res=100";
-    String queuesConfig =
-        // guaranteed,max,used,pending,reserved
-        "root(=[100 100 100 30 0]);" + // root
-            "-a(=[100 100 100 30 0])"; // a
-
-    String appsConfig =
-    // queueName\t(priority,resource,host,expression,#repeat,reserved,pending)
-        "a\t" // app1 in a
-            + "(1,1,n1,,100,false,0,user1);" + // app1 a
-            "a\t" // app2 in a
-            + "(1,1,n1,,0,false,30,user2)";
-
-    buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
-    policy.editSchedule();
-
-    // app2 needs more resource and its well under its user-limit. Hence preempt
-    // resources from app1.
-    verify(mDisp, times(30)).handle(argThat(
-        new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
-            getAppAttemptId(1))));
-  }
-
-  @Test
-  public void testNoIntraQueuePreemptionWithSingleUser()
-      throws IOException {
-    /**
-     * Queue structure is:
-     *
-     * <pre>
-     *       root
-     *        |
-     *        a
-     * </pre>
-     *
-     * Scenario:
-     *   Queue total resources: 100
-     *   Minimum user limit percent: 50%
-     *   +--------------+----------+------+---------+
-     *   | APP  | USER  | PRIORITY | USED | PENDING |
-     *   +--------------+----------+------+---------+
-     *   | app1 | user1 | 1        | 100  | 0       |
-     *   | app2 | user1 | 1        | 0    | 30      |
-     *   +--------------+----------+------+---------+
-     * Given single user, lower priority/late submitted apps has to
-     * wait.
-     */
-
-    // Set max preemption limit as 50%.
-    conf.setFloat(CapacitySchedulerConfiguration.
-        INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT,
-        (float) 0.5);
-
-    String labelsConfig = "=100,true;";
-    String nodesConfig = // n1 has no label
-        "n1= res=100";
-    String queuesConfig =
-        // guaranteed,max,used,pending,reserved
-        "root(=[100 100 100 30 0]);" + // root
-            "-a(=[100 100 100 30 0])"; // a
-
-    String appsConfig =
-    // queueName\t(priority,resource,host,expression,#repeat,reserved,pending)
-        "a\t" // app1 in a
-            + "(1,1,n1,,100,false,0,user1);" + // app1 a
-            "a\t" // app2 in a
-            + "(1,1,n1,,0,false,30,user1)";
-
-    buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
-    policy.editSchedule();
-
-    // app2 needs more resource. Since app1,2 are from same user, there wont be
-    // any preemption.
-    verify(mDisp, times(0)).handle(argThat(
-        new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
-            getAppAttemptId(1))));
-  }
-
-  @Test
-  public void testNoIntraQueuePreemptionWithTwoUserUnderUserLimit()
-      throws IOException {
-    /**
-     * Queue structure is:
-     *
-     * <pre>
-     *       root
-     *        |
-     *        a
-     * </pre>
-     *
-     * Scenario:
-     *   Queue total resources: 100
-     *   Minimum user limit percent: 50%
-     *   +--------------+----------+------+---------+
-     *   | APP  | USER  | PRIORITY | USED | PENDING |
-     *   +--------------+----------+------+---------+
-     *   | app1 | user1 | 1        | 50   | 0       |
-     *   | app2 | user2 | 1        | 30   | 30      |
-     *   +--------------+----------+------+---------+
-     * Hence in queueA of 100, each user has a quota of 50. app1 of high priority
-     * has a demand of 0 and its already using 50. app2 from user2 has a demand
-     * of 30, and UL is 50. Since app1 is under UL, there should not be any
-     * preemption.
-     */
-
-    // Set max preemption limit as 50%.
-    conf.setFloat(CapacitySchedulerConfiguration.
-        INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT,
-        (float) 0.5);
-
-    String labelsConfig = "=100,true;";
-    String nodesConfig = // n1 has no label
-        "n1= res=100";
-    String queuesConfig =
-        // guaranteed,max,used,pending,reserved
-        "root(=[100 100 80 30 0]);" + // root
-            "-a(=[100 100 80 30 0])"; // a
-
-    String appsConfig =
-    // queueName\t(priority,resource,host,expression,#repeat,reserved,pending)
-        "a\t" // app1 in a
-            + "(1,1,n1,,50,false,0,user1);" + // app1 a
-            "a\t" // app2 in a
-            + "(1,1,n1,,30,false,30,user2)";
-
-    buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
-    policy.editSchedule();
-
-    // app2 needs more resource. Since app1,2 are from same user, there wont be
-    // any preemption.
-    verify(mDisp, times(0)).handle(argThat(
-        new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
-            getAppAttemptId(1))));
-  }
-
-  @Test
-  public void testSimpleIntraQueuePreemptionWithTwoUsersWithAppPriority()
-      throws IOException {
-    /**
-     * Queue structure is:
-     *
-     * <pre>
-     *       root
-     *        |
-     *        a
-     * </pre>
-     *
-     * Scenario:
-     *   Queue total resources: 100
-     *   Minimum user limit percent: 50%
-     *   +--------------+----------+------+---------+
-     *   | APP  | USER  | PRIORITY | USED | PENDING |
-     *   +--------------+----------+------+---------+
-     *   | app1 | user1 | 2        | 100  | 0       |
-     *   | app2 | user2 | 1        | 0    | 30      |
-     *   +--------------+----------+------+---------+
-     * Hence in queueA of 100, each user has a quota of 50. app1 of high priority
-     * has a demand of 0 and its already using 100. app2 from user2 has a demand
-     * of 30, and UL is 50. 30 would be preempted from app1.
-     */
-
-    // Set max preemption limit as 50%.
-    conf.setFloat(CapacitySchedulerConfiguration.
-        INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT,
-        (float) 0.5);
-
-    String labelsConfig = "=100,true;";
-    String nodesConfig = // n1 has no label
-        "n1= res=100";
-    String queuesConfig =
-        // guaranteed,max,used,pending,reserved
-        "root(=[100 100 100 30 0]);" + // root
-            "-a(=[100 100 100 30 0])"; // a
-
-    String appsConfig =
-    // queueName\t(priority,resource,host,expression,#repeat,reserved,pending)
-        "a\t" // app1 in a
-            + "(2,1,n1,,100,false,0,user1);" + // app1 a
-            "a\t" // app2 in a
-            + "(1,1,n1,,0,false,30,user2)";
-
-    buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
-    policy.editSchedule();
-
-    // app2 needs more resource and its well under its user-limit. Hence preempt
-    // resources from app1 even though its priority is more than app2.
-    verify(mDisp, times(30)).handle(argThat(
-        new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
-            getAppAttemptId(1))));
-  }
-
-  @Test
-  public void testIntraQueuePreemptionOfUserLimitWithMultipleApps()
-      throws IOException {
-    /**
-     * Queue structure is:
-     *
-     * <pre>
-     *       root
-     *        |
-     *        a
-     * </pre>
-     *
-     * Scenario:
-     *   Queue total resources: 100
-     *   Minimum user limit percent: 50%
-     *   +--------------+----------+------+---------+
-     *   | APP  | USER  | PRIORITY | USED | PENDING |
-     *   +--------------+----------+------+---------+
-     *   | app1 | user1 | 1        | 30   | 30      |
-     *   | app2 | user2 | 1        | 20   | 20      |
-     *   | app3 | user1 | 1        | 30   | 30      |
-     *   | app4 | user2 | 1        | 0    | 10      |
-     *   +--------------+----------+------+---------+
-     * Hence in queueA of 100, each user has a quota of 50. Now have multiple
-     * apps and check for preemption across apps.
-     */
-
-    // Set max preemption limit as 50%.
-    conf.setFloat(CapacitySchedulerConfiguration.
-        INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT,
-        (float) 0.5);
-
-    String labelsConfig = "=100,true;";
-    String nodesConfig = // n1 has no label
-        "n1= res=100";
-    String queuesConfig =
-        // guaranteed,max,used,pending,reserved
-        "root(=[100 100 80 90 0]);" + // root
-            "-a(=[100 100 80 90 0])"; // a
-
-    String appsConfig =
-    // queueName\t(priority,resource,host,expression,#repeat,reserved,pending)
-        "a\t" // app1 in a
-            + "(1,1,n1,,30,false,30,user1);" + // app1 a
-            "a\t" // app2 in a
-            + "(1,1,n1,,20,false,20,user2);" +
-            "a\t" // app3 in a
-            + "(1,1,n1,,30,false,30,user1);" +
-            "a\t" // app4 in a
-            + "(1,1,n1,,0,false,10,user2)";
-
-    buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
-    policy.editSchedule();
-
-    // app2/app4 needs more resource and its well under its user-limit. Hence
-    // preempt resources from app3 (compare to app1, app3 has low priority).
-    verify(mDisp, times(9)).handle(argThat(
-        new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
-            getAppAttemptId(3))));
-  }
-
-  @Test
-  public void testNoPreemptionOfUserLimitWithMultipleAppsAndSameUser()
-      throws IOException {
-    /**
-     * Queue structure is:
-     *
-     * <pre>
-     *       root
-     *        |
-     *        a
-     * </pre>
-     *
-     * Scenario:
-     *   Queue total resources: 100
-     *   Minimum user limit percent: 50%
-     *   +--------------+----------+------+---------+
-     *   | APP  | USER  | PRIORITY | USED | PENDING |
-     *   +--------------+----------+------+---------+
-     *   | app1 | user1 | 1        | 30   | 30      |
-     *   | app2 | user1 | 1        | 20   | 20      |
-     *   | app3 | user1 | 1        | 30   | 30      |
-     *   | app4 | user1 | 1        | 0    | 10      |
-     *   +--------------+----------+------+---------+
-     * Hence in queueA of 100, each user has a quota of 50. Now have multiple
-     * apps and check for preemption across apps.
-     */
-
-    // Set max preemption limit as 50%.
-    conf.setFloat(CapacitySchedulerConfiguration.
-        INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT,
-        (float) 0.5);
-
-    String labelsConfig = "=100,true;";
-    String nodesConfig = // n1 has no label
-        "n1= res=100";
-    String queuesConfig =
-        // guaranteed,max,used,pending,reserved
-        "root(=[100 100 80 90 0]);" + // root
-            "-a(=[100 100 80 90 0])"; // a
-
-    String appsConfig =
-    // queueName\t(priority,resource,host,expression,#repeat,reserved,pending)
-        "a\t" // app1 in a
-            + "(1,1,n1,,30,false,20,user1);" + // app1 a
-            "a\t" // app2 in a
-            + "(1,1,n1,,20,false,20,user1);" +
-            "a\t" // app3 in a
-            + "(1,1,n1,,30,false,30,user1);" +
-            "a\t" // app4 in a
-            + "(1,1,n1,,0,false,10,user1)";
-
-    buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
-    policy.editSchedule();
-
-    // app2/app4 needs more resource and its well under its user-limit. Hence
-    // preempt resources from app3 (compare to app1, app3 has low priority).
-    verify(mDisp, times(0)).handle(argThat(
-        new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
-            getAppAttemptId(1))));
-    verify(mDisp, times(0)).handle(argThat(
-        new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
-            getAppAttemptId(2))));
-    verify(mDisp, times(0)).handle(argThat(
-        new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
-            getAppAttemptId(3))));
-    verify(mDisp, times(0)).handle(argThat(
-        new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
-            getAppAttemptId(4))));
-  }
-
-  @Test
-  public void testIntraQueuePreemptionOfUserLimitWitAppsOfDifferentPriority()
-      throws IOException {
-    /**
-     * Queue structure is:
-     * <pre>
-     *       root
-     *        |
-     *        a
-     * </pre>
-     *
-     * Scenario:
-     *   Queue total resources: 100
-     *   Minimum user limit percent: 50%
-     *   +--------------+----------+------+---------+
-     *   | APP  | USER  | PRIORITY | USED | PENDING |
-     *   +--------------+----------+------+---------+
-     *   | app1 | user1 | 3        | 30   | 30      |
-     *   | app2 | user2 | 1        | 20   | 20      |
-     *   | app3 | user1 | 4        | 30   | 0       |
-     *   | app4 | user2 | 1        | 0    | 10      |
-     *   +--------------+----------+------+---------+
-     * Hence in queueA of 100, each user has a quota of 50. Now have multiple
-     * apps and check for preemption across apps.
-     */
-
-    // Set max preemption limit as 50%.
-    conf.setFloat(
-        CapacitySchedulerConfiguration.INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT,
-        (float) 0.5);
-
-    String labelsConfig = "=100,true;";
-    String nodesConfig = // n1 has no label
-        "n1= res=100";
-    String queuesConfig =
-        // guaranteed,max,used,pending,reserved
-        "root(=[100 100 80 60 0]);" + // root
-            "-a(=[100 100 80 60 0])"; // b
-
-    String appsConfig =
-        // queueName\t(priority,resource,host,expression,#repeat,reserved,pending)
-        "a\t" // app1 in a
-            + "(3,1,n1,,30,false,30,user1);" + // app1 a
-            "a\t" // app2 in a
-            + "(1,1,n1,,20,false,20,user2);" + "a\t" // app3 in a
-            + "(4,1,n1,,30,false,0,user1);" + "a\t" // app4 in a
-            + "(1,1,n1,,0,false,10,user2)";
-
-    buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
-    policy.editSchedule();
-
-    // app2/app4 needs more resource and its well under its user-limit. Hence
-    // preempt resources from app1 (compare to app3, app1 has low priority).
-    verify(mDisp, times(9)).handle(argThat(
-        new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
-            getAppAttemptId(1))));
-  }
-
-  @Test
-  public void testIntraQueuePreemptionOfUserLimitInTwoQueues()
-      throws IOException {
-    /**
-     * Queue structure is:
-     *
-     * <pre>
-     *       root
-     *      /   \
-     *     a     b
-     * </pre>
-     *
-     * Guaranteed resource of a/b are 40:60 Total cluster resource = 100
-     * maxIntraQueuePreemptableLimit by default is 50%. This test is to verify
-     * that intra-queue preemption could occur in two queues when user-limit
-     * irreuglarity is present in queue.
-     */
-
-    // Set max preemption limit as 50%.
-    conf.setFloat(CapacitySchedulerConfiguration.
-            INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT,
-        (float) 0.5);
-
-    String labelsConfig = "=100,true;";
-    String nodesConfig = // n1 has no label
-        "n1= res=100";
-    String queuesConfig =
-        // guaranteed,max,used,pending,reserved
-        "root(=[100 100 90 80 0]);" + // root
-            "-a(=[60 100 55 60 0]);" + // a
-            "-b(=[40 100 35 20 0])"; // b
-
-    String appsConfig =
-        // queueName\t(priority,resource,host,expression,#repeat,reserved,pending)
-        "a\t" // app1 in a
-            + "(3,1,n1,,20,false,30,user1);" + // app1 a
-            "a\t" // app2 in a
-            + "(1,1,n1,,20,false,20,user2);" +
-            "a\t" // app3 in a
-            + "(4,1,n1,,15,false,0,user1);" +
-            "a\t" // app4 in a
-            + "(1,1,n1,,0,false,10,user2);" +
-            "b\t" // app5 in b
-            + "(3,1,n1,,25,false,10,user1);" +
-            "b\t" // app6 in b
-            + "(1,1,n1,,10,false,10,user2)";
-
-    buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
-    policy.editSchedule();
-
-    // app2/app4 needs more resource and its well under its user-limit. Hence
-    // preempt resources from app1 (compare to app3, app1 has low priority).
-    verify(mDisp, times(4)).handle(argThat(
-        new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
-            getAppAttemptId(1))));
-    verify(mDisp, times(4)).handle(argThat(
-        new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
-            getAppAttemptId(5))));
-  }
-
-  @Test
-  public void testIntraQueuePreemptionWithTwoRequestingUsers()
-      throws IOException {
-    /**
-    * Queue structure is:
-    *
-    * <pre>
-    *       root
-    *        |
-    *        a
-    * </pre>
-    *
-    * Scenario:
-    *   Queue total resources: 100
-    *   Minimum user limit percent: 50%
-    *   +--------------+----------+------+---------+
-    *   | APP  | USER  | PRIORITY | USED | PENDING |
-    *   +--------------+----------+------+---------+
-    *   | app1 | user1 | 1        | 60   | 10      |
-    *   | app2 | user2 | 1        | 40   | 10      |
-    *   +--------------+----------+------+---------+
-    * Hence in queueA of 100, each user has a quota of 50. Now have multiple
-    * apps and check for preemption across apps.
-    */
-
-    // Set max preemption limit as 50%.
-    conf.setFloat(CapacitySchedulerConfiguration.
-        INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT,
-        (float) 0.5);
-
-    String labelsConfig = "=100,true;";
-    String nodesConfig = // n1 has no label
-        "n1= res=100";
-    String queuesConfig =
-        // guaranteed,max,used,pending,reserved
-        "root(=[100 100 100 20 0]);" + // root
-            "-a(=[100 100 100 20 0])"; // a
-
-    String appsConfig =
-    // queueName\t(priority,resource,host,expression,#repeat,reserved,pending)
-        "a\t" // app1 in a
-            + "(1,1,n1,,60,false,10,user1);" + // app1 a
-            "a\t" // app2 in a
-            + "(1,1,n1,,40,false,10,user2)";
-
-    buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
-    policy.editSchedule();
-
-    // app2 needs more resource and its well under its user-limit. Hence preempt
-    // resources from app1.
-    verify(mDisp, times(9)).handle(argThat(
-        new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
-            getAppAttemptId(1))));
-    verify(mDisp, times(0)).handle(argThat(
-        new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
-            getAppAttemptId(2))));
-  }
-
-  @Test
-  public void testNoIntraQueuePreemptionIfBelowUserLimitAndLowPriorityExtraUsers()
-      throws IOException {
-     /**
-     * Queue structure is:
-     *
-     * <pre>
-     *       root
-     *        |
-     *        a
-     * </pre>
-     *
-     * Scenario:
-     * Preconditions:
-     *   Queue total resources: 100
-     *   Minimum user limit percent: 50%
-     *   +--------------+----------+------+---------+
-     *   | APP  | USER  | PRIORITY | USED | PENDING |
-     *   +--------------+----------+------+---------+
-     *   | app1 | user1 | 1        | 50   | 0       |
-     *   | app2 | user2 | 1        | 50   | 0       |
-     *   | app3 | user3 | 0        | 0    | 10      |
-     *   +--------------+----------+------+---------+
-     * This scenario should never preempt from either user1 or user2
-     */
-
-    // Set max preemption per round to 50% (this is different from minimum user
-    // limit percent).
-    conf.setFloat(CapacitySchedulerConfiguration.
-        INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT,
-        (float) 0.7);
-
-    String labelsConfig = "=100,true;";
-    String nodesConfig = // n1 has no label
-        "n1= res=100";
-    String queuesConfig =
-        // guaranteed,max,used,pending,reserved
-        "root(=[100 100 100 10 0]);" + // root
-            "-a(=[100 100 100 10 0])"; // a
-
-    String appsConfig =
-    // queueName\t\
-    //     (priority,resource,host,label,#repeat,reserved,pending,user)\tMULP;
-        "a\t(1,1,n1,,50,false,0,user1)\t50;" + // app1, user1
-        "a\t(1,1,n1,,50,false,0,user2)\t50;" + // app2, user2
-        "a\t(0,1,n1,,0,false,10,user3)\t50";   // app3, user3
-
-    buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
-    policy.editSchedule();
-
-    // app2/app4 needs more resource and its well under its user-limit. Hence
-    // preempt resources from app1 (compare to app3, app1 has low priority).
-    verify(mDisp, times(0)).handle(argThat(
-        new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
-            getAppAttemptId(1))));
-    verify(mDisp, times(0)).handle(argThat(
-        new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
-            getAppAttemptId(2))));
-  }
-
-  @Test
-  public void testNoIntraQueuePreemptionIfBelowUserLimitAndSamePriorityExtraUsers()
-      throws IOException {
-     /**
-     * Queue structure is:
-     *
-     * <pre>
-     *       root
-     *        |
-     *        a
-     * </pre>
-     *
-     * Scenario:
-     * Preconditions:
-     *   Queue total resources: 100
-     *   Minimum user limit percent: 50%
-     *   +--------------+----------+------+---------+
-     *   | APP  | USER  | PRIORITY | USED | PENDING |
-     *   +--------------+----------+------+---------+
-     *   | app1 | user1 | 1        | 50   | 0       |
-     *   | app2 | user2 | 1        | 50   | 0       |
-     *   | app3 | user3 | 1        | 0    | 10      |
-     *   +--------------+----------+------+---------+
-     * This scenario should never preempt from either user1 or user2
-     */
-
-    // Set max preemption per round to 50% (this is different from minimum user
-    // limit percent).
-    conf.setFloat(CapacitySchedulerConfiguration.
-        INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT,
-        (float) 0.7);
-
-    String labelsConfig = "=100,true;";
-    String nodesConfig = // n1 has no label
-        "n1= res=100";
-    String queuesConfig =
-        // guaranteed,max,used,pending,reserved
-        "root(=[100 100 100 10 0]);" + // root
-            "-a(=[100 100 100 10 0])"; // a
-
-    String appsConfig =
-    // queueName\t\
-    //     (priority,resource,host,label,#repeat,reserved,pending,user)\tMULP;
-        "a\t(1,1,n1,,50,false,0,user1)\t50;" + // app1, user1
-        "a\t(1,1,n1,,50,false,0,user2)\t50;" + // app2, user2
-        "a\t(1,1,n1,,0,false,10,user3)\t50";   // app3, user3
-
-    buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
-    policy.editSchedule();
-
-    // app2/app4 needs more resource and its well under its user-limit. Hence
-    // preempt resources from app1 (compare to app3, app1 has low priority).
-    verify(mDisp, times(0)).handle(argThat(
-        new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
-            getAppAttemptId(1))));
-    verify(mDisp, times(0)).handle(argThat(
-        new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
-            getAppAttemptId(2))));
-  }
-
-  @Test
-  public void testNoIntraQueuePreemptionIfBelowUserLimitAndHighPriorityExtraUsers()
-      throws IOException {
-     /**
-     * Queue structure is:
-     *
-     * <pre>
-     *       root
-     *        |
-     *        a
-     * </pre>
-     *
-     * Scenario:
-     * Preconditions:
-     *   Queue total resources: 100
-     *   Minimum user limit percent: 50%
-     *   +--------------+----------+------+---------+
-     *   | APP  | USER  | PRIORITY | USED | PENDING |
-     *   +--------------+----------+------+---------+
-     *   | app1 | user1 | 1        | 50   | 0       |
-     *   | app2 | user2 | 1        | 50   | 0       |
-     *   | app3 | user3 | 5        | 0    | 10      |
-     *   +--------------+----------+------+---------+
-     * This scenario should never preempt from either user1 or user2
-     */
-
-    // Set max preemption per round to 50% (this is different from minimum user
-    // limit percent).
-    conf.setFloat(CapacitySchedulerConfiguration.
-        INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT,
-        (float) 0.7);
-
-    String labelsConfig = "=100,true;";
-    String nodesConfig = // n1 has no label
-        "n1= res=100";
-    String queuesConfig =
-        // guaranteed,max,used,pending,reserved
-        "root(=[100 100 100 10 0]);" + // root
-            "-a(=[100 100 100 10 0])"; // a
-
-    String appsConfig =
-    // queueName\t\
-    //     (priority,resource,host,label,#repeat,reserved,pending,user)\tMULP;
-        "a\t(1,1,n1,,50,false,0,user1)\t50;" + // app1, user1
-        "a\t(1,1,n1,,50,false,0,user2)\t50;" + // app2, user2
-        "a\t(5,1,n1,,0,false,10,user3)\t50";   // app3, user3
-
-    buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
-    policy.editSchedule();
-
-    // app2/app4 needs more resource and its well under its user-limit. Hence
-    // preempt resources from app1 (compare to app3, app1 has low priority).
-    verify(mDisp, times(0)).handle(argThat(
-        new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
-            getAppAttemptId(1))));
-    verify(mDisp, times(0)).handle(argThat(
-        new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
-            getAppAttemptId(2))));
-  }
-
-  @Test
-  public void testNoIntraQueuePreemptionWithUserLimitDeadzone()
-      throws IOException {
-    /**
-    * Queue structure is:
-    *
-    * <pre>
-    *       root
-    *        |
-    *        a
-    * </pre>
-    *
-    * Scenario:
-    *   Queue total resources: 100
-    *   Minimum user limit percent: 50%
-    *   +--------------+----------+------+---------+
-    *   | APP  | USER  | PRIORITY | USED | PENDING |
-    *   +--------------+----------+------+---------+
-    *   | app1 | user1 | 1        | 60   | 10      |
-    *   | app2 | user2 | 1        | 40   | 10      |
-    *   +--------------+----------+------+---------+
-    * Hence in queueA of 100, each user has a quota of 50. Now have multiple
-    * apps and check for preemption across apps but also ensure that user's
-    * usage not coming under its user-limit.
-    */
-
-    // Set max preemption limit as 50%.
-    conf.setFloat(CapacitySchedulerConfiguration.
-        INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT,
-        (float) 0.5);
-
-    String labelsConfig = "=100,true;";
-    String nodesConfig = // n1 has no label
-        "n1= res=100";
-    String queuesConfig =
-        // guaranteed,max,used,pending,reserved
-        "root(=[100 100 100 20 0]);" + // root
-            "-a(=[100 100 100 20 0])"; // a
-
-    String appsConfig =
-    // queueName\t(priority,resource,host,expression,#repeat,reserved,pending)
-        "a\t" // app1 in a
-            + "(1,3,n1,,20,false,10,user1);" + // app1 a
-            "a\t" // app2 in a
-            + "(1,1,n1,,40,false,10,user2)";
-
-    buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
-    policy.editSchedule();
-
-    // app2 needs more resource and its well under its user-limit. Hence preempt
-    // 3 resources (9GB) from app1. We will not preempt last container as it may
-    // pull user's usage under its user-limit.
-    verify(mDisp, times(3)).handle(argThat(
-        new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
-            getAppAttemptId(1))));
-    verify(mDisp, times(0)).handle(argThat(
-        new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
-            getAppAttemptId(2))));
-  }
-
-  @Test
-  public void testIntraQueuePreemptionWithUserLimitDeadzoneAndPriority()
-      throws IOException {
-    /**
-    * Queue structure is:
-    *
-    * <pre>
-    *       root
-    *        |
-    *        a
-    * </pre>
-    *
-    * Scenario:
-    *   Queue total resources: 100
-    *   Minimum user limit percent: 50%
-    *   +--------------+----------+------+---------+
-    *   | APP  | USER  | PRIORITY | USED | PENDING |
-    *   +--------------+----------+------+---------+
-    *   | app1 | user1 | 1        | 60   | 10      |
-    *   | app2 | user2 | 1        | 40   | 10      |
-    *   +--------------+----------+------+---------+
-    * Hence in queueA of 100, each user has a quota of 50. Now have multiple
-    * apps and check for preemption across apps but also ensure that user's
-    * usage not coming under its user-limit.
-    */
-
-    // Set max preemption limit as 50%.
-    conf.setFloat(CapacitySchedulerConfiguration.
-        INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT,
-        (float) 0.5);
-
-    String labelsConfig = "=100,true;";
-    String nodesConfig = // n1 has no label
-        "n1= res=100";
-    String queuesConfig =
-        // guaranteed,max,used,pending,reserved
-        "root(=[100 100 100 20 0]);" + // root
-            "-a(=[100 100 100 20 0])"; // a
-
-    String appsConfig =
-    // queueName\t(priority,resource,host,expression,#repeat,reserved,pending)
-        "a\t" // app1 in a
-            + "(1,3,n1,,20,false,10,user1);" + // app1 a
-            "a\t" // app2 in a
-            + "(2,1,n1,,0,false,10,user1);" + // app1 a
-            "a\t" // app2 in a
-            + "(1,1,n1,,40,false,20,user2)";
-
-    buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
-    policy.editSchedule();
-
-    // app2 needs more resource and its well under its user-limit. Hence preempt
-    // 3 resources (9GB) from app1. We will not preempt last container as it may
-    // pull user's usage under its user-limit.
-    verify(mDisp, times(3)).handle(argThat(
-        new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
-            getAppAttemptId(1))));
-    verify(mDisp, times(0)).handle(argThat(
-        new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
-            getAppAttemptId(2))));
-
-    // After first round, 3 containers were preempted from app1 and resource
-    // distribution will be like below.
-    appsConfig =
-        // queueName\t(priority,resource,host,expression,#repeat,reserved,pending)
-        "a\t" // app1 in a
-            + "(1,3,n1,,17,false,10,user1);" + // app1 a
-            "a\t" // app2 in a
-            + "(2,1,n1,,0,false,10,user1);" + // app2 a
-            "a\t" // app2 in a
-            + "(1,1,n1,,49,false,11,user2)";
-
-    buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
-    policy.editSchedule();
-
-    // app2 has priority demand within same user 'user1'. However user1's used
-    // is alredy under UL. Hence no preemption. We will still get 3 container
-    // while asserting as it was aleady selected in earlier round.
-    verify(mDisp, times(3)).handle(argThat(
-        new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
-            getAppAttemptId(1))));
-  }
-}

+ 0 - 178
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyIntraQueueWithDRF.java

@@ -1,178 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity;
-
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
-import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.io.IOException;
-
-import static org.mockito.Matchers.argThat;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-/**
- * Test class for IntraQueuePreemption scenarios.
- */
-public class TestProportionalCapacityPreemptionPolicyIntraQueueWithDRF
-    extends
-      ProportionalCapacityPreemptionPolicyMockFramework {
-  @Before
-  public void setup() {
-    super.setup();
-    conf.setBoolean(
-        CapacitySchedulerConfiguration.INTRAQUEUE_PREEMPTION_ENABLED, true);
-    rc = new DominantResourceCalculator();
-    when(cs.getResourceCalculator()).thenReturn(rc);
-    policy = new ProportionalCapacityPreemptionPolicy(rmContext, cs, mClock);
-  }
-
-  @Test
-  public void testSimpleIntraQueuePreemptionWithVCoreResource()
-      throws IOException {
-    /**
-     * The simplest test preemption, Queue structure is:
-     *
-     * <pre>
-     *       root
-     *     /  | | \
-     *    a  b  c  d
-     * </pre>
-     *
-     * Guaranteed resource of a/b/c/d are 10:40:20:30 Total cluster resource =
-     * 100 Scenario: Queue B has few running apps and two high priority apps
-     * have demand. Apps which are running at low priority (4) will preempt few
-     * of its resources to meet the demand.
-     */
-
-    conf.set(CapacitySchedulerConfiguration.INTRAQUEUE_PREEMPTION_ORDER_POLICY,
-        "priority_first");
-
-    String labelsConfig = "=100:200,true;";
-    String nodesConfig = // n1 has no label
-        "n1= res=100:200";
-    String queuesConfig =
-        // guaranteed,max,used,pending,reserved
-        "root(=[100:50 100:50 80:40 120:60 0]);" + // root
-            "-a(=[10:5 100:50 10:5 50:25 0]);" + // a
-            "-b(=[40:20 100:50 40:20 60:30 0]);" + // b
-            "-c(=[20:10 100:50 10:5 10:5 0]);" + // c
-            "-d(=[30:15 100:50 20:10 0 0])"; // d
-
-    String appsConfig =
-        // queueName\t(priority,resource,host,expression,#repeat,reserved,
-        // pending)
-        "a\t" // app1 in a
-            + "(1,1:1,n1,,5,false,25:25);" + // app1 a
-            "a\t" // app2 in a
-            + "(1,1:1,n1,,5,false,25:25);" + // app2 a
-            "b\t" // app3 in b
-            + "(4,1:1,n1,,36,false,20:20);" + // app3 b
-            "b\t" // app4 in b
-            + "(4,1:1,n1,,2,false,10:10);" + // app4 b
-            "b\t" // app4 in b
-            + "(5,1:1,n1,,1,false,10:10);" + // app5 b
-            "b\t" // app4 in b
-            + "(6,1:1,n1,,1,false,10:10);" + // app6 in b
-            "c\t" // app1 in a
-            + "(1,1:1,n1,,10,false,10:10);" + "d\t" // app7 in c
-            + "(1,1:1,n1,,20,false,0)";
-
-    buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
-    policy.editSchedule();
-
-    // For queue B, app3 and app4 were of lower priority. Hence take 8
-    // containers from them by hitting the intraQueuePreemptionDemand of 20%.
-    verify(mDisp, times(1)).handle(argThat(
-        new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
-            getAppAttemptId(4))));
-    verify(mDisp, times(7)).handle(argThat(
-        new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
-            getAppAttemptId(3))));
-  }
-
-  @Test
-  public void testIntraQueuePreemptionWithDominantVCoreResource()
-      throws IOException {
-    /**
-     * The simplest test preemption, Queue structure is:
-     *
-     * <pre>
-     *     root
-     *     /  \
-     *    a    b
-     * </pre>
-     *
-     * Guaranteed resource of a/b are 40:60 Total cluster resource = 100
-     * Scenario: Queue B has few running apps and two high priority apps have
-     * demand. Apps which are running at low priority (4) will preempt few of
-     * its resources to meet the demand.
-     */
-
-    conf.set(CapacitySchedulerConfiguration.INTRAQUEUE_PREEMPTION_ORDER_POLICY,
-        "priority_first");
-    // Set max preemption limit as 50%.
-    conf.setFloat(CapacitySchedulerConfiguration.
-        INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT,
-        (float) 0.5);
-
-    String labelsConfig = "=100:200,true;";
-    String nodesConfig = // n1 has no label
-        "n1= res=100:200";
-    String queuesConfig =
-        // guaranteed,max,used,pending,reserved
-        "root(=[100:50 100:50 50:40 110:60 0]);" + // root
-            "-a(=[40:20 100:50 9:9 50:30 0]);" + // a
-            "-b(=[60:30 100:50 40:30 60:30 0]);"; // b
-
-    String appsConfig =
-        // queueName\t(priority,resource,host,expression,#repeat,reserved,
-        // pending)
-        "a\t" // app1 in a
-            + "(1,2:1,n1,,4,false,25:25);" + // app1 a
-            "a\t" // app2 in a
-            + "(1,1:3,n1,,2,false,25:25);" + // app2 a
-            "b\t" // app3 in b
-            + "(4,2:1,n1,,10,false,20:20);" + // app3 b
-            "b\t" // app4 in b
-            + "(4,1:2,n1,,5,false,10:10);" + // app4 b
-            "b\t" // app5 in b
-            + "(5,1:1,n1,,5,false,30:20);" + // app5 b
-            "b\t" // app6 in b
-            + "(6,2:1,n1,,5,false,30:20);";
-
-    buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
-    policy.editSchedule();
-
-    // For queue B, app3 and app4 were of lower priority. Hence take 4
-    // containers.
-    verify(mDisp, times(9)).handle(argThat(
-        new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
-            getAppAttemptId(3))));
-    verify(mDisp, times(4)).handle(argThat(
-        new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
-            getAppAttemptId(4))));
-    verify(mDisp, times(4)).handle(argThat(
-        new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
-            getAppAttemptId(5))));
-  }
-}