Browse Source

YARN-3983. Refactored CapacityScheduleri#FiCaSchedulerApp to easier extend container allocation logic. Contributed by Wangda Tan

Jian He 10 năm trước cách đây
mục cha
commit
ba2313d614
11 tập tin đã thay đổi với 1011 bổ sung692 xóa
  1. 3 0
      hadoop-yarn-project/CHANGES.txt
  2. 1 7
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
  3. 10 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSAssignment.java
  4. 5 5
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
  5. 2 2
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
  6. 28 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/AllocationState.java
  7. 76 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/ContainerAllocation.java
  8. 115 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/ContainerAllocator.java
  9. 629 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java
  10. 73 585
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
  11. 69 92
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java

+ 3 - 0
hadoop-yarn-project/CHANGES.txt

@@ -391,6 +391,9 @@ Release 2.8.0 - UNRELEASED
     YARN-2768. Avoid cloning Resource in FSAppAttempt#updateDemand.
     (Hong Zhiguo via kasha)
 
+    YARN-3983. Refactored CapacityScheduleri#FiCaSchedulerApp to easier extend
+    container allocation logic. (Wangda Tan via jianhe)
+
   BUG FIXES
 
     YARN-3197. Confusing log generated by CapacityScheduler. (Varun Saxena 

+ 1 - 7
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java

@@ -53,13 +53,7 @@ import org.apache.hadoop.yarn.util.resource.Resources;
 import com.google.common.collect.Sets;
 
 public abstract class AbstractCSQueue implements CSQueue {
-  private static final Log LOG = LogFactory.getLog(AbstractCSQueue.class);
-  
-  static final CSAssignment NULL_ASSIGNMENT =
-      new CSAssignment(Resources.createResource(0, 0), NodeType.NODE_LOCAL);
-  
-  static final CSAssignment SKIP_ASSIGNMENT = new CSAssignment(true);
-  
+  private static final Log LOG = LogFactory.getLog(AbstractCSQueue.class);  
   CSQueue parent;
   final String queueName;
   volatile int numContainers;

+ 10 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSAssignment.java

@@ -24,12 +24,17 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.AssignmentInformation;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
+import org.apache.hadoop.yarn.util.resource.Resources;
 
 @Private
 @Unstable
 public class CSAssignment {
+  public static final CSAssignment NULL_ASSIGNMENT =
+      new CSAssignment(Resources.createResource(0, 0), NodeType.NODE_LOCAL);
 
-  final private Resource resource;
+  public static final CSAssignment SKIP_ASSIGNMENT = new CSAssignment(true);
+
+  private Resource resource;
   private NodeType type;
   private RMContainer excessReservation;
   private FiCaSchedulerApp application;
@@ -67,6 +72,10 @@ public class CSAssignment {
   public Resource getResource() {
     return resource;
   }
+  
+  public void setResource(Resource resource) {
+    this.resource = resource;
+  }
 
   public NodeType getType() {
     return type;

+ 5 - 5
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java

@@ -777,7 +777,7 @@ public class LeafQueue extends AbstractCSQueue {
     // if our queue cannot access this node, just return
     if (schedulingMode == SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY
         && !accessibleToPartition(node.getPartition())) {
-      return NULL_ASSIGNMENT;
+      return CSAssignment.NULL_ASSIGNMENT;
     }
 
     // Check if this queue need more resource, simply skip allocation if this
@@ -789,7 +789,7 @@ public class LeafQueue extends AbstractCSQueue {
             + ", because it doesn't need more resource, schedulingMode="
             + schedulingMode.name() + " node-partition=" + node.getPartition());
       }
-      return NULL_ASSIGNMENT;
+      return CSAssignment.NULL_ASSIGNMENT;
     }
 
     for (Iterator<FiCaSchedulerApp> assignmentIterator =
@@ -800,7 +800,7 @@ public class LeafQueue extends AbstractCSQueue {
       if (!super.canAssignToThisQueue(clusterResource, node.getPartition(),
           currentResourceLimits, application.getCurrentReservation(),
           schedulingMode)) {
-        return NULL_ASSIGNMENT;
+        return CSAssignment.NULL_ASSIGNMENT;
       }
       
       Resource userLimit =
@@ -846,11 +846,11 @@ public class LeafQueue extends AbstractCSQueue {
       } else if (!assignment.getSkipped()) {
         // If we don't allocate anything, and it is not skipped by application,
         // we will return to respect FIFO of applications
-        return NULL_ASSIGNMENT;
+        return CSAssignment.NULL_ASSIGNMENT;
       }
     }
 
-    return NULL_ASSIGNMENT;
+    return CSAssignment.NULL_ASSIGNMENT;
   }
 
   protected Resource getHeadroom(User user, Resource queueCurrentLimit,

+ 2 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java

@@ -384,7 +384,7 @@ public class ParentQueue extends AbstractCSQueue {
     // if our queue cannot access this node, just return
     if (schedulingMode == SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY
         && !accessibleToPartition(node.getPartition())) {
-      return NULL_ASSIGNMENT;
+      return CSAssignment.NULL_ASSIGNMENT;
     }
     
     // Check if this queue need more resource, simply skip allocation if this
@@ -396,7 +396,7 @@ public class ParentQueue extends AbstractCSQueue {
             + ", because it doesn't need more resource, schedulingMode="
             + schedulingMode.name() + " node-partition=" + node.getPartition());
       }
-      return NULL_ASSIGNMENT;
+      return CSAssignment.NULL_ASSIGNMENT;
     }
     
     CSAssignment assignment = 

+ 28 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/AllocationState.java

@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.allocator;
+
+public enum AllocationState {
+  APP_SKIPPED,
+  PRIORITY_SKIPPED,
+  LOCALITY_SKIPPED,
+  QUEUE_SKIPPED,
+  ALLOCATED,
+  RESERVED
+}

+ 76 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/ContainerAllocation.java

@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.allocator;
+
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
+import org.apache.hadoop.yarn.util.resource.Resources;
+
+public class ContainerAllocation {
+  public static final ContainerAllocation PRIORITY_SKIPPED =
+      new ContainerAllocation(null, null, AllocationState.PRIORITY_SKIPPED);
+  
+  public static final ContainerAllocation APP_SKIPPED =
+      new ContainerAllocation(null, null, AllocationState.APP_SKIPPED);
+
+  public static final ContainerAllocation QUEUE_SKIPPED =
+      new ContainerAllocation(null, null, AllocationState.QUEUE_SKIPPED);
+  
+  public static final ContainerAllocation LOCALITY_SKIPPED =
+      new ContainerAllocation(null, null, AllocationState.LOCALITY_SKIPPED);
+  
+  RMContainer containerToBeUnreserved;
+  private Resource resourceToBeAllocated = Resources.none();
+  AllocationState state;
+  NodeType containerNodeType = NodeType.NODE_LOCAL;
+  NodeType requestNodeType = NodeType.NODE_LOCAL;
+  Container updatedContainer;
+
+  public ContainerAllocation(RMContainer containerToBeUnreserved,
+      Resource resourceToBeAllocated, AllocationState state) {
+    this.containerToBeUnreserved = containerToBeUnreserved;
+    this.resourceToBeAllocated = resourceToBeAllocated;
+    this.state = state;
+  }
+  
+  public RMContainer getContainerToBeUnreserved() {
+    return containerToBeUnreserved;
+  }
+  
+  public Resource getResourceToBeAllocated() {
+    if (resourceToBeAllocated == null) {
+      return Resources.none();
+    }
+    return resourceToBeAllocated;
+  }
+  
+  public AllocationState getAllocationState() {
+    return state;
+  }
+  
+  public NodeType getContainerNodeType() {
+    return containerNodeType;
+  }
+  
+  public Container getUpdatedContainer() {
+    return updatedContainer;
+  }
+}

+ 115 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/ContainerAllocator.java

@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.allocator;
+
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
+import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
+import org.apache.hadoop.yarn.util.resource.Resources;
+
+/**
+ * For an application, resource limits and resource requests, decide how to
+ * allocate container. This is to make application resource allocation logic
+ * extensible.
+ */
+public abstract class ContainerAllocator {
+  FiCaSchedulerApp application;
+  final ResourceCalculator rc;
+  final RMContext rmContext;
+  
+  public ContainerAllocator(FiCaSchedulerApp application,
+      ResourceCalculator rc, RMContext rmContext) {
+    this.application = application;
+    this.rc = rc;
+    this.rmContext = rmContext;
+  }
+  
+  /**
+   * preAllocation is to perform checks, etc. to see if we can/cannot allocate
+   * container. It will put necessary information to returned
+   * {@link ContainerAllocation}. 
+   */
+  abstract ContainerAllocation preAllocation(
+      Resource clusterResource, FiCaSchedulerNode node,
+      SchedulingMode schedulingMode, ResourceLimits resourceLimits,
+      Priority priority, RMContainer reservedContainer);
+  
+  /**
+   * doAllocation is to update application metrics, create containers, etc.
+   * According to allocating conclusion decided by preAllocation.
+   */
+  abstract ContainerAllocation doAllocation(
+      ContainerAllocation allocationResult, Resource clusterResource,
+      FiCaSchedulerNode node, SchedulingMode schedulingMode, Priority priority,
+      RMContainer reservedContainer);
+  
+  boolean checkHeadroom(Resource clusterResource,
+      ResourceLimits currentResourceLimits, Resource required,
+      FiCaSchedulerNode node) {
+    // If headroom + currentReservation < required, we cannot allocate this
+    // require
+    Resource resourceCouldBeUnReserved = application.getCurrentReservation();
+    if (!application.getCSLeafQueue().getReservationContinueLooking()
+        || !node.getPartition().equals(RMNodeLabelsManager.NO_LABEL)) {
+      // If we don't allow reservation continuous looking, OR we're looking at
+      // non-default node partition, we won't allow to unreserve before
+      // allocation.
+      resourceCouldBeUnReserved = Resources.none();
+    }
+    return Resources.greaterThanOrEqual(rc, clusterResource, Resources.add(
+        currentResourceLimits.getHeadroom(), resourceCouldBeUnReserved),
+        required);
+  }
+  
+  /**
+   * allocate needs to handle following stuffs:
+   * 
+   * <ul>
+   * <li>Select request: Select a request to allocate. E.g. select a resource
+   * request based on requirement/priority/locality.</li>
+   * <li>Check if a given resource can be allocated based on resource
+   * availability</li>
+   * <li>Do allocation: this will decide/create allocated/reserved
+   * container, this will also update metrics</li>
+   * </ul>
+   */
+  public ContainerAllocation allocate(Resource clusterResource,
+      FiCaSchedulerNode node, SchedulingMode schedulingMode,
+      ResourceLimits resourceLimits, Priority priority,
+      RMContainer reservedContainer) {
+    ContainerAllocation result =
+        preAllocation(clusterResource, node, schedulingMode,
+            resourceLimits, priority, reservedContainer);
+    
+    if (AllocationState.ALLOCATED == result.state
+        || AllocationState.RESERVED == result.state) {
+      result = doAllocation(result, clusterResource, node,
+          schedulingMode, priority, reservedContainer);
+    }
+    
+    return result;
+  }
+}

+ 629 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java

@@ -0,0 +1,629 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.allocator;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
+import org.apache.hadoop.yarn.server.utils.BuilderUtils;
+import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
+import org.apache.hadoop.yarn.util.resource.Resources;
+
+/**
+ * Allocate normal (new) containers, considers locality/label, etc. Using
+ * delayed scheduling mechanism to get better locality allocation.
+ */
+public class RegularContainerAllocator extends ContainerAllocator {
+  private static final Log LOG = LogFactory.getLog(RegularContainerAllocator.class);
+  
+  private ResourceRequest lastResourceRequest = null;
+  
+  public RegularContainerAllocator(FiCaSchedulerApp application,
+      ResourceCalculator rc, RMContext rmContext) {
+    super(application, rc, rmContext);
+  }
+  
+  private ContainerAllocation preCheckForNewContainer(Resource clusterResource,
+      FiCaSchedulerNode node, SchedulingMode schedulingMode,
+      ResourceLimits resourceLimits, Priority priority) {
+    if (SchedulerAppUtils.isBlacklisted(application, node, LOG)) {
+      return ContainerAllocation.APP_SKIPPED;
+    }
+
+    ResourceRequest anyRequest =
+        application.getResourceRequest(priority, ResourceRequest.ANY);
+    if (null == anyRequest) {
+      return ContainerAllocation.PRIORITY_SKIPPED;
+    }
+
+    // Required resource
+    Resource required = anyRequest.getCapability();
+
+    // Do we need containers at this 'priority'?
+    if (application.getTotalRequiredResources(priority) <= 0) {
+      return ContainerAllocation.PRIORITY_SKIPPED;
+    }
+
+    // AM container allocation doesn't support non-exclusive allocation to
+    // avoid painful of preempt an AM container
+    if (schedulingMode == SchedulingMode.IGNORE_PARTITION_EXCLUSIVITY) {
+      RMAppAttempt rmAppAttempt =
+          rmContext.getRMApps().get(application.getApplicationId())
+              .getCurrentAppAttempt();
+      if (rmAppAttempt.getSubmissionContext().getUnmanagedAM() == false
+          && null == rmAppAttempt.getMasterContainer()) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Skip allocating AM container to app_attempt="
+              + application.getApplicationAttemptId()
+              + ", don't allow to allocate AM container in non-exclusive mode");
+        }
+        return ContainerAllocation.APP_SKIPPED;
+      }
+    }
+
+    // Is the node-label-expression of this offswitch resource request
+    // matches the node's label?
+    // If not match, jump to next priority.
+    if (!SchedulerUtils.checkResourceRequestMatchingNodePartition(anyRequest,
+        node.getPartition(), schedulingMode)) {
+      return ContainerAllocation.PRIORITY_SKIPPED;
+    }
+
+    if (!application.getCSLeafQueue().getReservationContinueLooking()) {
+      if (!shouldAllocOrReserveNewContainer(priority, required)) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("doesn't need containers based on reservation algo!");
+        }
+        return ContainerAllocation.PRIORITY_SKIPPED;
+      }
+    }
+
+    if (!checkHeadroom(clusterResource, resourceLimits, required, node)) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("cannot allocate required resource=" + required
+            + " because of headroom");
+      }
+      return ContainerAllocation.QUEUE_SKIPPED;
+    }
+
+    // Inform the application it is about to get a scheduling opportunity
+    application.addSchedulingOpportunity(priority);
+
+    // Increase missed-non-partitioned-resource-request-opportunity.
+    // This is to make sure non-partitioned-resource-request will prefer
+    // to be allocated to non-partitioned nodes
+    int missedNonPartitionedRequestSchedulingOpportunity = 0;
+    if (anyRequest.getNodeLabelExpression()
+        .equals(RMNodeLabelsManager.NO_LABEL)) {
+      missedNonPartitionedRequestSchedulingOpportunity =
+          application
+              .addMissedNonPartitionedRequestSchedulingOpportunity(priority);
+    }
+
+    if (schedulingMode == SchedulingMode.IGNORE_PARTITION_EXCLUSIVITY) {
+      // Before doing allocation, we need to check scheduling opportunity to
+      // make sure : non-partitioned resource request should be scheduled to
+      // non-partitioned partition first.
+      if (missedNonPartitionedRequestSchedulingOpportunity < rmContext
+          .getScheduler().getNumClusterNodes()) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Skip app_attempt=" + application.getApplicationAttemptId()
+              + " priority=" + priority
+              + " because missed-non-partitioned-resource-request"
+              + " opportunity under requred:" + " Now="
+              + missedNonPartitionedRequestSchedulingOpportunity + " required="
+              + rmContext.getScheduler().getNumClusterNodes());
+        }
+
+        return ContainerAllocation.APP_SKIPPED;
+      }
+    }
+    
+    return null;
+  }
+
+  @Override
+  ContainerAllocation preAllocation(Resource clusterResource,
+      FiCaSchedulerNode node, SchedulingMode schedulingMode,
+      ResourceLimits resourceLimits, Priority priority,
+      RMContainer reservedContainer) {
+    ContainerAllocation result;
+    if (null == reservedContainer) {
+      // pre-check when allocating new container
+      result =
+          preCheckForNewContainer(clusterResource, node, schedulingMode,
+              resourceLimits, priority);
+      if (null != result) {
+        return result;
+      }
+    } else {
+      // pre-check when allocating reserved container
+      if (application.getTotalRequiredResources(priority) == 0) {
+        // Release
+        return new ContainerAllocation(reservedContainer, null,
+            AllocationState.QUEUE_SKIPPED);
+      }
+    }
+
+    // Try to allocate containers on node
+    result =
+        assignContainersOnNode(clusterResource, node, priority,
+            reservedContainer, schedulingMode, resourceLimits);
+    
+    if (null == reservedContainer) {
+      if (result.state == AllocationState.PRIORITY_SKIPPED) {
+        // Don't count 'skipped nodes' as a scheduling opportunity!
+        application.subtractSchedulingOpportunity(priority);
+      }
+    }
+    
+    return result;
+  }
+  
+  public synchronized float getLocalityWaitFactor(
+      Priority priority, int clusterNodes) {
+    // Estimate: Required unique resources (i.e. hosts + racks)
+    int requiredResources = 
+        Math.max(application.getResourceRequests(priority).size() - 1, 0);
+    
+    // waitFactor can't be more than '1' 
+    // i.e. no point skipping more than clustersize opportunities
+    return Math.min(((float)requiredResources / clusterNodes), 1.0f);
+  }
+  
+  private int getActualNodeLocalityDelay() {
+    return Math.min(rmContext.getScheduler().getNumClusterNodes(), application
+        .getCSLeafQueue().getNodeLocalityDelay());
+  }
+
+  private boolean canAssign(Priority priority, FiCaSchedulerNode node,
+      NodeType type, RMContainer reservedContainer) {
+
+    // Clearly we need containers for this application...
+    if (type == NodeType.OFF_SWITCH) {
+      if (reservedContainer != null) {
+        return true;
+      }
+
+      // 'Delay' off-switch
+      ResourceRequest offSwitchRequest =
+          application.getResourceRequest(priority, ResourceRequest.ANY);
+      long missedOpportunities = application.getSchedulingOpportunities(priority);
+      long requiredContainers = offSwitchRequest.getNumContainers();
+
+      float localityWaitFactor =
+          getLocalityWaitFactor(priority, rmContext.getScheduler()
+              .getNumClusterNodes());
+
+      return ((requiredContainers * localityWaitFactor) < missedOpportunities);
+    }
+
+    // Check if we need containers on this rack
+    ResourceRequest rackLocalRequest =
+        application.getResourceRequest(priority, node.getRackName());
+    if (rackLocalRequest == null || rackLocalRequest.getNumContainers() <= 0) {
+      return false;
+    }
+
+    // If we are here, we do need containers on this rack for RACK_LOCAL req
+    if (type == NodeType.RACK_LOCAL) {
+      // 'Delay' rack-local just a little bit...
+      long missedOpportunities = application.getSchedulingOpportunities(priority);
+      return getActualNodeLocalityDelay() < missedOpportunities;
+    }
+
+    // Check if we need containers on this host
+    if (type == NodeType.NODE_LOCAL) {
+      // Now check if we need containers on this host...
+      ResourceRequest nodeLocalRequest =
+          application.getResourceRequest(priority, node.getNodeName());
+      if (nodeLocalRequest != null) {
+        return nodeLocalRequest.getNumContainers() > 0;
+      }
+    }
+
+    return false;
+  }
+
+  private ContainerAllocation assignNodeLocalContainers(
+      Resource clusterResource, ResourceRequest nodeLocalResourceRequest,
+      FiCaSchedulerNode node, Priority priority, RMContainer reservedContainer,
+      SchedulingMode schedulingMode, ResourceLimits currentResoureLimits) {
+    if (canAssign(priority, node, NodeType.NODE_LOCAL, reservedContainer)) {
+      return assignContainer(clusterResource, node, priority,
+          nodeLocalResourceRequest, NodeType.NODE_LOCAL, reservedContainer,
+          schedulingMode, currentResoureLimits);
+    }
+
+    // Skip node-local request, go to rack-local request
+    return ContainerAllocation.LOCALITY_SKIPPED;
+  }
+
+  private ContainerAllocation assignRackLocalContainers(
+      Resource clusterResource, ResourceRequest rackLocalResourceRequest,
+      FiCaSchedulerNode node, Priority priority, RMContainer reservedContainer,
+      SchedulingMode schedulingMode, ResourceLimits currentResoureLimits) {
+    if (canAssign(priority, node, NodeType.RACK_LOCAL, reservedContainer)) {
+      return assignContainer(clusterResource, node, priority,
+          rackLocalResourceRequest, NodeType.RACK_LOCAL, reservedContainer,
+          schedulingMode, currentResoureLimits);
+    }
+
+    // Skip rack-local request, go to off-switch request
+    return ContainerAllocation.LOCALITY_SKIPPED;
+  }
+
+  private ContainerAllocation assignOffSwitchContainers(
+      Resource clusterResource, ResourceRequest offSwitchResourceRequest,
+      FiCaSchedulerNode node, Priority priority, RMContainer reservedContainer,
+      SchedulingMode schedulingMode, ResourceLimits currentResoureLimits) {
+    if (canAssign(priority, node, NodeType.OFF_SWITCH, reservedContainer)) {
+      return assignContainer(clusterResource, node, priority,
+          offSwitchResourceRequest, NodeType.OFF_SWITCH, reservedContainer,
+          schedulingMode, currentResoureLimits);
+    }
+
+    return ContainerAllocation.QUEUE_SKIPPED;
+  }
+
+  private ContainerAllocation assignContainersOnNode(Resource clusterResource,
+      FiCaSchedulerNode node, Priority priority, RMContainer reservedContainer,
+      SchedulingMode schedulingMode, ResourceLimits currentResoureLimits) {
+
+    ContainerAllocation assigned;
+
+    NodeType requestType = null;
+    // Data-local
+    ResourceRequest nodeLocalResourceRequest =
+        application.getResourceRequest(priority, node.getNodeName());
+    if (nodeLocalResourceRequest != null) {
+      requestType = NodeType.NODE_LOCAL;
+      assigned =
+          assignNodeLocalContainers(clusterResource, nodeLocalResourceRequest,
+              node, priority, reservedContainer, schedulingMode,
+              currentResoureLimits);
+      if (Resources.greaterThan(rc, clusterResource,
+          assigned.getResourceToBeAllocated(), Resources.none())) {
+        assigned.requestNodeType = requestType;
+        return assigned;
+      }
+    }
+
+    // Rack-local
+    ResourceRequest rackLocalResourceRequest =
+        application.getResourceRequest(priority, node.getRackName());
+    if (rackLocalResourceRequest != null) {
+      if (!rackLocalResourceRequest.getRelaxLocality()) {
+        return ContainerAllocation.PRIORITY_SKIPPED;
+      }
+
+      if (requestType != NodeType.NODE_LOCAL) {
+        requestType = NodeType.RACK_LOCAL;
+      }
+
+      assigned =
+          assignRackLocalContainers(clusterResource, rackLocalResourceRequest,
+              node, priority, reservedContainer, schedulingMode,
+              currentResoureLimits);
+      if (Resources.greaterThan(rc, clusterResource,
+          assigned.getResourceToBeAllocated(), Resources.none())) {
+        assigned.requestNodeType = requestType;
+        return assigned;
+      }
+    }
+
+    // Off-switch
+    ResourceRequest offSwitchResourceRequest =
+        application.getResourceRequest(priority, ResourceRequest.ANY);
+    if (offSwitchResourceRequest != null) {
+      if (!offSwitchResourceRequest.getRelaxLocality()) {
+        return ContainerAllocation.PRIORITY_SKIPPED;
+      }
+      if (requestType != NodeType.NODE_LOCAL
+          && requestType != NodeType.RACK_LOCAL) {
+        requestType = NodeType.OFF_SWITCH;
+      }
+
+      assigned =
+          assignOffSwitchContainers(clusterResource, offSwitchResourceRequest,
+              node, priority, reservedContainer, schedulingMode,
+              currentResoureLimits);
+      assigned.requestNodeType = requestType;
+
+      return assigned;
+    }
+
+    return ContainerAllocation.PRIORITY_SKIPPED;
+  }
+
+  private ContainerAllocation assignContainer(Resource clusterResource,
+      FiCaSchedulerNode node, Priority priority, ResourceRequest request,
+      NodeType type, RMContainer rmContainer, SchedulingMode schedulingMode,
+      ResourceLimits currentResoureLimits) {
+    lastResourceRequest = request;
+    
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("assignContainers: node=" + node.getNodeName()
+        + " application=" + application.getApplicationId()
+        + " priority=" + priority.getPriority()
+        + " request=" + request + " type=" + type);
+    }
+
+    // check if the resource request can access the label
+    if (!SchedulerUtils.checkResourceRequestMatchingNodePartition(request,
+        node.getPartition(), schedulingMode)) {
+      // this is a reserved container, but we cannot allocate it now according
+      // to label not match. This can be caused by node label changed
+      // We should un-reserve this container.
+      return new ContainerAllocation(rmContainer, null,
+          AllocationState.QUEUE_SKIPPED);
+    }
+
+    Resource capability = request.getCapability();
+    Resource available = node.getAvailableResource();
+    Resource totalResource = node.getTotalResource();
+
+    if (!Resources.lessThanOrEqual(rc, clusterResource,
+        capability, totalResource)) {
+      LOG.warn("Node : " + node.getNodeID()
+          + " does not have sufficient resource for request : " + request
+          + " node total capability : " + node.getTotalResource());
+      return ContainerAllocation.QUEUE_SKIPPED;
+    }
+
+    assert Resources.greaterThan(
+        rc, clusterResource, available, Resources.none());
+
+    boolean shouldAllocOrReserveNewContainer = shouldAllocOrReserveNewContainer(
+        priority, capability);
+
+    // Can we allocate a container on this node?
+    int availableContainers =
+        rc.computeAvailableContainers(available, capability);
+
+    // How much need to unreserve equals to:
+    // max(required - headroom, amountNeedUnreserve)
+    Resource resourceNeedToUnReserve =
+        Resources.max(rc, clusterResource,
+            Resources.subtract(capability, currentResoureLimits.getHeadroom()),
+            currentResoureLimits.getAmountNeededUnreserve());
+
+    boolean needToUnreserve =
+        Resources.greaterThan(rc, clusterResource,
+            resourceNeedToUnReserve, Resources.none());
+
+    RMContainer unreservedContainer = null;
+    boolean reservationsContinueLooking =
+        application.getCSLeafQueue().getReservationContinueLooking();
+
+    if (availableContainers > 0) {
+      // Allocate...
+      // We will only do continuous reservation when this is not allocated from
+      // reserved container
+      if (rmContainer == null && reservationsContinueLooking
+          && node.getLabels().isEmpty()) {
+        // when reservationsContinueLooking is set, we may need to unreserve
+        // some containers to meet this queue, its parents', or the users'
+        // resource limits.
+        // TODO, need change here when we want to support continuous reservation
+        // looking for labeled partitions.
+        if (!shouldAllocOrReserveNewContainer || needToUnreserve) {
+          if (!needToUnreserve) {
+            // If we shouldn't allocate/reserve new container then we should
+            // unreserve one the same size we are asking for since the
+            // currentResoureLimits.getAmountNeededUnreserve could be zero. If
+            // the limit was hit then use the amount we need to unreserve to be
+            // under the limit.
+            resourceNeedToUnReserve = capability;
+          }
+          unreservedContainer =
+              application.findNodeToUnreserve(clusterResource, node, priority,
+                  resourceNeedToUnReserve);
+          // When (minimum-unreserved-resource > 0 OR we cannot allocate
+          // new/reserved
+          // container (That means we *have to* unreserve some resource to
+          // continue)). If we failed to unreserve some resource, we can't
+          // continue.
+          if (null == unreservedContainer) {
+            return ContainerAllocation.QUEUE_SKIPPED;
+          }
+        }
+      }
+
+      ContainerAllocation result =
+          new ContainerAllocation(unreservedContainer, request.getCapability(),
+              AllocationState.ALLOCATED);
+      result.containerNodeType = type;
+      return result;
+    } else {
+      // if we are allowed to allocate but this node doesn't have space, reserve it or
+      // if this was an already a reserved container, reserve it again
+      if (shouldAllocOrReserveNewContainer || rmContainer != null) {
+
+        if (reservationsContinueLooking && rmContainer == null) {
+          // we could possibly ignoring queue capacity or user limits when
+          // reservationsContinueLooking is set. Make sure we didn't need to unreserve
+          // one.
+          if (needToUnreserve) {
+            if (LOG.isDebugEnabled()) {
+              LOG.debug("we needed to unreserve to be able to allocate");
+            }
+            return ContainerAllocation.QUEUE_SKIPPED;
+          }
+        }
+
+        ContainerAllocation result =
+            new ContainerAllocation(null, request.getCapability(),
+                AllocationState.RESERVED);
+        result.containerNodeType = type;
+        return result;
+      }
+      return ContainerAllocation.QUEUE_SKIPPED;
+    }
+  }
+
+  boolean
+      shouldAllocOrReserveNewContainer(Priority priority, Resource required) {
+    int requiredContainers = application.getTotalRequiredResources(priority);
+    int reservedContainers = application.getNumReservedContainers(priority);
+    int starvation = 0;
+    if (reservedContainers > 0) {
+      float nodeFactor =
+          Resources
+              .ratio(rc, required, application.getCSLeafQueue().getMaximumAllocation());
+
+      // Use percentage of node required to bias against large containers...
+      // Protect against corner case where you need the whole node with
+      // Math.min(nodeFactor, minimumAllocationFactor)
+      starvation =
+          (int) ((application.getReReservations(priority) / 
+              (float) reservedContainers) * (1.0f - (Math.min(
+                  nodeFactor, application.getCSLeafQueue()
+                  .getMinimumAllocationFactor()))));
+
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("needsContainers:" + " app.#re-reserve="
+            + application.getReReservations(priority) + " reserved="
+            + reservedContainers + " nodeFactor=" + nodeFactor
+            + " minAllocFactor="
+            + application.getCSLeafQueue().getMinimumAllocationFactor()
+            + " starvation=" + starvation);
+      }
+    }
+    return (((starvation + requiredContainers) - reservedContainers) > 0);
+  }
+  
+  private Container getContainer(RMContainer rmContainer,
+      FiCaSchedulerNode node, Resource capability, Priority priority) {
+    return (rmContainer != null) ? rmContainer.getContainer()
+        : createContainer(node, capability, priority);
+  }
+
+  private Container createContainer(FiCaSchedulerNode node, Resource capability,
+      Priority priority) {
+
+    NodeId nodeId = node.getRMNode().getNodeID();
+    ContainerId containerId =
+        BuilderUtils.newContainerId(application.getApplicationAttemptId(),
+            application.getNewContainerId());
+
+    // Create the container
+    return BuilderUtils.newContainer(containerId, nodeId, node.getRMNode()
+        .getHttpAddress(), capability, priority, null);
+  }
+  
+  private ContainerAllocation handleNewContainerAllocation(
+      ContainerAllocation allocationResult, FiCaSchedulerNode node,
+      Priority priority, RMContainer reservedContainer, Container container) {
+    // Handling container allocation
+    // Did we previously reserve containers at this 'priority'?
+    if (reservedContainer != null) {
+      application.unreserve(priority, node, reservedContainer);
+    }
+    
+    // Inform the application
+    RMContainer allocatedContainer =
+        application.allocate(allocationResult.containerNodeType, node,
+            priority, lastResourceRequest, container);
+
+    // Does the application need this resource?
+    if (allocatedContainer == null) {
+      // Skip this app if we failed to allocate.
+      ContainerAllocation ret =
+          new ContainerAllocation(allocationResult.containerToBeUnreserved,
+              null, AllocationState.QUEUE_SKIPPED);
+      ret.state = AllocationState.APP_SKIPPED;
+      return ret;
+    }
+
+    // Inform the node
+    node.allocateContainer(allocatedContainer);
+    
+    // update locality statistics
+    application.incNumAllocatedContainers(allocationResult.containerNodeType,
+        allocationResult.requestNodeType);
+    
+    return allocationResult;    
+  }
+
+  @Override
+  ContainerAllocation doAllocation(ContainerAllocation allocationResult,
+      Resource clusterResource, FiCaSchedulerNode node,
+      SchedulingMode schedulingMode, Priority priority,
+      RMContainer reservedContainer) {
+    // Create the container if necessary
+    Container container =
+        getContainer(reservedContainer, node,
+            allocationResult.getResourceToBeAllocated(), priority);
+
+    // something went wrong getting/creating the container
+    if (container == null) {
+      LOG.warn("Couldn't get container for allocation!");
+      return ContainerAllocation.QUEUE_SKIPPED;
+    }
+
+    if (allocationResult.getAllocationState() == AllocationState.ALLOCATED) {
+      // When allocating container
+      allocationResult =
+          handleNewContainerAllocation(allocationResult, node, priority,
+              reservedContainer, container);
+    } else {
+      // When reserving container
+      application.reserve(priority, node, reservedContainer, container);
+    }
+    allocationResult.updatedContainer = container;
+
+    // Only reset opportunities when we FIRST allocate the container. (IAW, When
+    // reservedContainer != null, it's not the first time)
+    if (reservedContainer == null) {
+      // Don't reset scheduling opportunities for off-switch assignments
+      // otherwise the app will be delayed for each non-local assignment.
+      // This helps apps with many off-cluster requests schedule faster.
+      if (allocationResult.containerNodeType != NodeType.OFF_SWITCH) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Resetting scheduling opportunities");
+        }
+        application.resetSchedulingOpportunities(priority);
+      }
+      
+      // Non-exclusive scheduling opportunity is different: we need reset
+      // it every time to make sure non-labeled resource request will be
+      // most likely allocated on non-labeled nodes first.
+      application.resetMissedNonPartitionedRequestSchedulingOpportunity(priority);
+    }
+
+    return allocationResult;
+  }
+}

+ 73 - 585
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java

@@ -24,7 +24,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
-import org.apache.commons.lang.mutable.MutableObject;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
@@ -40,9 +39,7 @@ import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger;
 import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
-import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
@@ -54,15 +51,16 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSAssignment;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityHeadroomProvider;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode;
-import org.apache.hadoop.yarn.server.utils.BuilderUtils;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.allocator.AllocationState;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.allocator.ContainerAllocator;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.allocator.RegularContainerAllocator;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.allocator.ContainerAllocation;
 import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
 import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
 import org.apache.hadoop.yarn.util.resource.Resources;
@@ -78,11 +76,6 @@ import com.google.common.annotations.VisibleForTesting;
 public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
   private static final Log LOG = LogFactory.getLog(FiCaSchedulerApp.class);
 
-  static final CSAssignment NULL_ASSIGNMENT =
-      new CSAssignment(Resources.createResource(0, 0), NodeType.NODE_LOCAL);
-
-  static final CSAssignment SKIP_ASSIGNMENT = new CSAssignment(true);
-
   private final Set<ContainerId> containersToPreempt =
     new HashSet<ContainerId>();
     
@@ -91,6 +84,8 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
   private ResourceCalculator rc = new DefaultResourceCalculator();
 
   private ResourceScheduler scheduler;
+  
+  private ContainerAllocator containerAllocator;
 
   public FiCaSchedulerApp(ApplicationAttemptId applicationAttemptId, 
       String user, Queue queue, ActiveUsersManager activeUsersManager,
@@ -124,6 +119,8 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
     if (scheduler.getResourceCalculator() != null) {
       rc = scheduler.getResourceCalculator();
     }
+    
+    containerAllocator = new RegularContainerAllocator(this, rc, rmContext);
   }
 
   synchronized public boolean containerCompleted(RMContainer rmContainer,
@@ -386,223 +383,6 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
       ((FiCaSchedulerApp) appAttempt).getHeadroomProvider();
   }
 
-  private int getActualNodeLocalityDelay() {
-    return Math.min(scheduler.getNumClusterNodes(), getCSLeafQueue()
-        .getNodeLocalityDelay());
-  }
-
-  private boolean canAssign(Priority priority, FiCaSchedulerNode node,
-      NodeType type, RMContainer reservedContainer) {
-
-    // Clearly we need containers for this application...
-    if (type == NodeType.OFF_SWITCH) {
-      if (reservedContainer != null) {
-        return true;
-      }
-
-      // 'Delay' off-switch
-      ResourceRequest offSwitchRequest =
-          getResourceRequest(priority, ResourceRequest.ANY);
-      long missedOpportunities = getSchedulingOpportunities(priority);
-      long requiredContainers = offSwitchRequest.getNumContainers();
-
-      float localityWaitFactor =
-          getLocalityWaitFactor(priority, scheduler.getNumClusterNodes());
-
-      return ((requiredContainers * localityWaitFactor) < missedOpportunities);
-    }
-
-    // Check if we need containers on this rack
-    ResourceRequest rackLocalRequest =
-        getResourceRequest(priority, node.getRackName());
-    if (rackLocalRequest == null || rackLocalRequest.getNumContainers() <= 0) {
-      return false;
-    }
-
-    // If we are here, we do need containers on this rack for RACK_LOCAL req
-    if (type == NodeType.RACK_LOCAL) {
-      // 'Delay' rack-local just a little bit...
-      long missedOpportunities = getSchedulingOpportunities(priority);
-      return getActualNodeLocalityDelay() < missedOpportunities;
-    }
-
-    // Check if we need containers on this host
-    if (type == NodeType.NODE_LOCAL) {
-      // Now check if we need containers on this host...
-      ResourceRequest nodeLocalRequest =
-          getResourceRequest(priority, node.getNodeName());
-      if (nodeLocalRequest != null) {
-        return nodeLocalRequest.getNumContainers() > 0;
-      }
-    }
-
-    return false;
-  }
-
-  boolean
-      shouldAllocOrReserveNewContainer(Priority priority, Resource required) {
-    int requiredContainers = getTotalRequiredResources(priority);
-    int reservedContainers = getNumReservedContainers(priority);
-    int starvation = 0;
-    if (reservedContainers > 0) {
-      float nodeFactor =
-          Resources.ratio(
-              rc, required, getCSLeafQueue().getMaximumAllocation()
-              );
-
-      // Use percentage of node required to bias against large containers...
-      // Protect against corner case where you need the whole node with
-      // Math.min(nodeFactor, minimumAllocationFactor)
-      starvation =
-          (int)((getReReservations(priority) / (float)reservedContainers) *
-                (1.0f - (Math.min(nodeFactor, getCSLeafQueue().getMinimumAllocationFactor())))
-               );
-
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("needsContainers:" +
-            " app.#re-reserve=" + getReReservations(priority) +
-            " reserved=" + reservedContainers +
-            " nodeFactor=" + nodeFactor +
-            " minAllocFactor=" + getCSLeafQueue().getMinimumAllocationFactor() +
-            " starvation=" + starvation);
-      }
-    }
-    return (((starvation + requiredContainers) - reservedContainers) > 0);
-  }
-
-  private CSAssignment assignNodeLocalContainers(Resource clusterResource,
-      ResourceRequest nodeLocalResourceRequest, FiCaSchedulerNode node,
-      Priority priority,
-      RMContainer reservedContainer, MutableObject allocatedContainer,
-      SchedulingMode schedulingMode, ResourceLimits currentResoureLimits) {
-    if (canAssign(priority, node, NodeType.NODE_LOCAL,
-        reservedContainer)) {
-      return assignContainer(clusterResource, node, priority,
-          nodeLocalResourceRequest, NodeType.NODE_LOCAL, reservedContainer,
-          allocatedContainer, schedulingMode, currentResoureLimits);
-    }
-
-    return new CSAssignment(Resources.none(), NodeType.NODE_LOCAL);
-  }
-
-  private CSAssignment assignRackLocalContainers(Resource clusterResource,
-      ResourceRequest rackLocalResourceRequest, FiCaSchedulerNode node,
-      Priority priority,
-      RMContainer reservedContainer, MutableObject allocatedContainer,
-      SchedulingMode schedulingMode, ResourceLimits currentResoureLimits) {
-    if (canAssign(priority, node, NodeType.RACK_LOCAL,
-        reservedContainer)) {
-      return assignContainer(clusterResource, node, priority,
-          rackLocalResourceRequest, NodeType.RACK_LOCAL, reservedContainer,
-          allocatedContainer, schedulingMode, currentResoureLimits);
-    }
-
-    return new CSAssignment(Resources.none(), NodeType.RACK_LOCAL);
-  }
-
-  private CSAssignment assignOffSwitchContainers(Resource clusterResource,
-      ResourceRequest offSwitchResourceRequest, FiCaSchedulerNode node,
-      Priority priority,
-      RMContainer reservedContainer, MutableObject allocatedContainer,
-      SchedulingMode schedulingMode, ResourceLimits currentResoureLimits) {
-    if (canAssign(priority, node, NodeType.OFF_SWITCH,
-        reservedContainer)) {
-      return assignContainer(clusterResource, node, priority,
-          offSwitchResourceRequest, NodeType.OFF_SWITCH, reservedContainer,
-          allocatedContainer, schedulingMode, currentResoureLimits);
-    }
-
-    return new CSAssignment(Resources.none(), NodeType.OFF_SWITCH);
-  }
-
-  private CSAssignment assignContainersOnNode(Resource clusterResource,
-      FiCaSchedulerNode node, Priority priority,
-      RMContainer reservedContainer, SchedulingMode schedulingMode,
-      ResourceLimits currentResoureLimits) {
-
-    CSAssignment assigned;
-
-    NodeType requestType = null;
-    MutableObject allocatedContainer = new MutableObject();
-    // Data-local
-    ResourceRequest nodeLocalResourceRequest =
-        getResourceRequest(priority, node.getNodeName());
-    if (nodeLocalResourceRequest != null) {
-      requestType = NodeType.NODE_LOCAL;
-      assigned =
-          assignNodeLocalContainers(clusterResource, nodeLocalResourceRequest,
-            node, priority, reservedContainer,
-            allocatedContainer, schedulingMode, currentResoureLimits);
-      if (Resources.greaterThan(rc, clusterResource,
-        assigned.getResource(), Resources.none())) {
-
-        //update locality statistics
-        if (allocatedContainer.getValue() != null) {
-          incNumAllocatedContainers(NodeType.NODE_LOCAL,
-            requestType);
-        }
-        assigned.setType(NodeType.NODE_LOCAL);
-        return assigned;
-      }
-    }
-
-    // Rack-local
-    ResourceRequest rackLocalResourceRequest =
-        getResourceRequest(priority, node.getRackName());
-    if (rackLocalResourceRequest != null) {
-      if (!rackLocalResourceRequest.getRelaxLocality()) {
-        return SKIP_ASSIGNMENT;
-      }
-
-      if (requestType != NodeType.NODE_LOCAL) {
-        requestType = NodeType.RACK_LOCAL;
-      }
-
-      assigned =
-          assignRackLocalContainers(clusterResource, rackLocalResourceRequest,
-            node, priority, reservedContainer,
-            allocatedContainer, schedulingMode, currentResoureLimits);
-      if (Resources.greaterThan(rc, clusterResource,
-        assigned.getResource(), Resources.none())) {
-
-        //update locality statistics
-        if (allocatedContainer.getValue() != null) {
-          incNumAllocatedContainers(NodeType.RACK_LOCAL,
-            requestType);
-        }
-        assigned.setType(NodeType.RACK_LOCAL);
-        return assigned;
-      }
-    }
-
-    // Off-switch
-    ResourceRequest offSwitchResourceRequest =
-        getResourceRequest(priority, ResourceRequest.ANY);
-    if (offSwitchResourceRequest != null) {
-      if (!offSwitchResourceRequest.getRelaxLocality()) {
-        return SKIP_ASSIGNMENT;
-      }
-      if (requestType != NodeType.NODE_LOCAL
-          && requestType != NodeType.RACK_LOCAL) {
-        requestType = NodeType.OFF_SWITCH;
-      }
-
-      assigned =
-          assignOffSwitchContainers(clusterResource, offSwitchResourceRequest,
-            node, priority, reservedContainer,
-            allocatedContainer, schedulingMode, currentResoureLimits);
-
-      // update locality statistics
-      if (allocatedContainer.getValue() != null) {
-        incNumAllocatedContainers(NodeType.OFF_SWITCH, requestType);
-      }
-      assigned.setType(NodeType.OFF_SWITCH);
-      return assigned;
-    }
-
-    return SKIP_ASSIGNMENT;
-  }
-
   public void reserve(Priority priority,
       FiCaSchedulerNode node, RMContainer rmContainer, Container container) {
     // Update reserved metrics if this is the first reservation
@@ -618,25 +398,6 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
     node.reserveResource(this, priority, rmContainer);
   }
 
-  private Container getContainer(RMContainer rmContainer,
-      FiCaSchedulerNode node, Resource capability, Priority priority) {
-    return (rmContainer != null) ? rmContainer.getContainer()
-        : createContainer(node, capability, priority);
-  }
-
-  Container createContainer(FiCaSchedulerNode node, Resource capability,
-      Priority priority) {
-
-    NodeId nodeId = node.getRMNode().getNodeID();
-    ContainerId containerId =
-        BuilderUtils.newContainerId(getApplicationAttemptId(),
-            getNewContainerId());
-
-    // Create the container
-    return BuilderUtils.newContainer(containerId, nodeId, node.getRMNode()
-        .getHttpAddress(), capability, priority, null);
-  }
-
   @VisibleForTesting
   public RMContainer findNodeToUnreserve(Resource clusterResource,
       FiCaSchedulerNode node, Priority priority,
@@ -672,203 +433,63 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
     return nodeToUnreserve.getReservedContainer();
   }
 
-  private LeafQueue getCSLeafQueue() {
+  public LeafQueue getCSLeafQueue() {
     return (LeafQueue)queue;
   }
 
-  private CSAssignment assignContainer(Resource clusterResource, FiCaSchedulerNode node,
-      Priority priority,
-      ResourceRequest request, NodeType type, RMContainer rmContainer,
-      MutableObject createdContainer, SchedulingMode schedulingMode,
-      ResourceLimits currentResoureLimits) {
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("assignContainers: node=" + node.getNodeName()
-        + " application=" + getApplicationId()
-        + " priority=" + priority.getPriority()
-        + " request=" + request + " type=" + type);
-    }
-
-    // check if the resource request can access the label
-    if (!SchedulerUtils.checkResourceRequestMatchingNodePartition(request,
-        node.getPartition(), schedulingMode)) {
-      // this is a reserved container, but we cannot allocate it now according
-      // to label not match. This can be caused by node label changed
-      // We should un-reserve this container.
-      if (rmContainer != null) {
-        unreserve(priority, node, rmContainer);
-      }
-      return new CSAssignment(Resources.none(), type);
-    }
-
-    Resource capability = request.getCapability();
-    Resource available = node.getAvailableResource();
-    Resource totalResource = node.getTotalResource();
-
-    if (!Resources.lessThanOrEqual(rc, clusterResource,
-        capability, totalResource)) {
-      LOG.warn("Node : " + node.getNodeID()
-          + " does not have sufficient resource for request : " + request
-          + " node total capability : " + node.getTotalResource());
-      return new CSAssignment(Resources.none(), type);
-    }
-
-    assert Resources.greaterThan(
-        rc, clusterResource, available, Resources.none());
-
-    // Create the container if necessary
-    Container container =
-        getContainer(rmContainer, node, capability, priority);
-
-    // something went wrong getting/creating the container
-    if (container == null) {
-      LOG.warn("Couldn't get container for allocation!");
-      return new CSAssignment(Resources.none(), type);
-    }
-
-    boolean shouldAllocOrReserveNewContainer = shouldAllocOrReserveNewContainer(
-        priority, capability);
-
-    // Can we allocate a container on this node?
-    int availableContainers =
-        rc.computeAvailableContainers(available, capability);
-
-    // How much need to unreserve equals to:
-    // max(required - headroom, amountNeedUnreserve)
-    Resource resourceNeedToUnReserve =
-        Resources.max(rc, clusterResource,
-            Resources.subtract(capability, currentResoureLimits.getHeadroom()),
-            currentResoureLimits.getAmountNeededUnreserve());
-
-    boolean needToUnreserve =
-        Resources.greaterThan(rc, clusterResource,
-            resourceNeedToUnReserve, Resources.none());
-
-    RMContainer unreservedContainer = null;
-    boolean reservationsContinueLooking =
-        getCSLeafQueue().getReservationContinueLooking();
-
-    if (availableContainers > 0) {
-      // Allocate...
-
-      // Did we previously reserve containers at this 'priority'?
-      if (rmContainer != null) {
-        unreserve(priority, node, rmContainer);
-      } else if (reservationsContinueLooking && node.getLabels().isEmpty()) {
-        // when reservationsContinueLooking is set, we may need to unreserve
-        // some containers to meet this queue, its parents', or the users' resource limits.
-        // TODO, need change here when we want to support continuous reservation
-        // looking for labeled partitions.
-        if (!shouldAllocOrReserveNewContainer || needToUnreserve) {
-          if (!needToUnreserve) {
-            // If we shouldn't allocate/reserve new container then we should
-            // unreserve one the same size we are asking for since the
-            // currentResoureLimits.getAmountNeededUnreserve could be zero. If
-            // the limit was hit then use the amount we need to unreserve to be
-            // under the limit.
-            resourceNeedToUnReserve = capability;
-          }
-          unreservedContainer =
-              findNodeToUnreserve(clusterResource, node, priority,
-                  resourceNeedToUnReserve);
-          // When (minimum-unreserved-resource > 0 OR we cannot allocate new/reserved
-          // container (That means we *have to* unreserve some resource to
-          // continue)). If we failed to unreserve some resource, we can't continue.
-          if (null == unreservedContainer) {
-            return new CSAssignment(Resources.none(), type);
-          }
-        }
-      }
-
-      // Inform the application
-      RMContainer allocatedContainer =
-          allocate(type, node, priority, request, container);
-
-      // Does the application need this resource?
-      if (allocatedContainer == null) {
-        CSAssignment csAssignment =  new CSAssignment(Resources.none(), type);
-        csAssignment.setApplication(this);
-        csAssignment.setExcessReservation(unreservedContainer);
-        return csAssignment;
-      }
-
-      // Inform the node
-      node.allocateContainer(allocatedContainer);
-
-      // Inform the ordering policy
-      getCSLeafQueue().getOrderingPolicy().containerAllocated(this,
-          allocatedContainer);
-
-      LOG.info("assignedContainer" +
-          " application attempt=" + getApplicationAttemptId() +
-          " container=" + container +
-          " queue=" + this +
-          " clusterResource=" + clusterResource);
-      createdContainer.setValue(allocatedContainer);
-      CSAssignment assignment = new CSAssignment(container.getResource(), type);
-      assignment.getAssignmentInformation().addAllocationDetails(
-        container.getId(), getCSLeafQueue().getQueuePath());
-      assignment.getAssignmentInformation().incrAllocations();
-      assignment.setApplication(this);
-      Resources.addTo(assignment.getAssignmentInformation().getAllocated(),
-        container.getResource());
-
-      assignment.setExcessReservation(unreservedContainer);
-      return assignment;
-    } else {
-      // if we are allowed to allocate but this node doesn't have space, reserve it or
-      // if this was an already a reserved container, reserve it again
-      if (shouldAllocOrReserveNewContainer || rmContainer != null) {
-
-        if (reservationsContinueLooking && rmContainer == null) {
-          // we could possibly ignoring queue capacity or user limits when
-          // reservationsContinueLooking is set. Make sure we didn't need to unreserve
-          // one.
-          if (needToUnreserve) {
-            if (LOG.isDebugEnabled()) {
-              LOG.debug("we needed to unreserve to be able to allocate");
-            }
-            return new CSAssignment(Resources.none(), type);
-          }
-        }
-
-        // Reserve by 'charging' in advance...
-        reserve(priority, node, rmContainer, container);
-
-        LOG.info("Reserved container " +
-            " application=" + getApplicationId() +
-            " resource=" + request.getCapability() +
-            " queue=" + this.toString() +
-            " cluster=" + clusterResource);
-        CSAssignment assignment =
-            new CSAssignment(request.getCapability(), type);
+  private CSAssignment getCSAssignmentFromAllocateResult(
+      Resource clusterResource, ContainerAllocation result) {
+    // Handle skipped
+    boolean skipped =
+        (result.getAllocationState() == AllocationState.APP_SKIPPED);
+    CSAssignment assignment = new CSAssignment(skipped);
+    assignment.setApplication(this);
+    
+    // Handle excess reservation
+    assignment.setExcessReservation(result.getContainerToBeUnreserved());
+
+    // If we allocated something
+    if (Resources.greaterThan(rc, clusterResource,
+        result.getResourceToBeAllocated(), Resources.none())) {
+      Resource allocatedResource = result.getResourceToBeAllocated();
+      Container updatedContainer = result.getUpdatedContainer();
+      
+      assignment.setResource(allocatedResource);
+      assignment.setType(result.getContainerNodeType());
+
+      if (result.getAllocationState() == AllocationState.RESERVED) {
+        // This is a reserved container
+        LOG.info("Reserved container " + " application=" + getApplicationId()
+            + " resource=" + allocatedResource + " queue="
+            + this.toString() + " cluster=" + clusterResource);
         assignment.getAssignmentInformation().addReservationDetails(
-          container.getId(), getCSLeafQueue().getQueuePath());
+            updatedContainer.getId(), getCSLeafQueue().getQueuePath());
         assignment.getAssignmentInformation().incrReservations();
         Resources.addTo(assignment.getAssignmentInformation().getReserved(),
-          request.getCapability());
-        return assignment;
+            allocatedResource);
+        assignment.setFulfilledReservation(true);
+      } else {
+        // This is a new container
+        // Inform the ordering policy
+        LOG.info("assignedContainer" + " application attempt="
+            + getApplicationAttemptId() + " container="
+            + updatedContainer.getId() + " queue=" + this + " clusterResource="
+            + clusterResource);
+
+        getCSLeafQueue().getOrderingPolicy().containerAllocated(this,
+            getRMContainer(updatedContainer.getId()));
+
+        assignment.getAssignmentInformation().addAllocationDetails(
+            updatedContainer.getId(), getCSLeafQueue().getQueuePath());
+        assignment.getAssignmentInformation().incrAllocations();
+        Resources.addTo(assignment.getAssignmentInformation().getAllocated(),
+            allocatedResource);
       }
-      return new CSAssignment(Resources.none(), type);
     }
+    
+    return assignment;
   }
-
-  private boolean checkHeadroom(Resource clusterResource,
-      ResourceLimits currentResourceLimits, Resource required, FiCaSchedulerNode node) {
-    // If headroom + currentReservation < required, we cannot allocate this
-    // require
-    Resource resourceCouldBeUnReserved = getCurrentReservation();
-    if (!getCSLeafQueue().getReservationContinueLooking() || !node.getPartition().equals(RMNodeLabelsManager.NO_LABEL)) {
-      // If we don't allow reservation continuous looking, OR we're looking at
-      // non-default node partition, we won't allow to unreserve before
-      // allocation.
-      resourceCouldBeUnReserved = Resources.none();
-    }
-    return Resources
-        .greaterThanOrEqual(rc, clusterResource, Resources.add(
-            currentResourceLimits.getHeadroom(), resourceCouldBeUnReserved),
-            required);
-  }
-
+  
   public CSAssignment assignContainers(Resource clusterResource,
       FiCaSchedulerNode node, ResourceLimits currentResourceLimits,
       SchedulingMode schedulingMode) {
@@ -886,174 +507,41 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
             + ", because it doesn't need more resource, schedulingMode="
             + schedulingMode.name() + " node-label=" + node.getPartition());
       }
-      return SKIP_ASSIGNMENT;
+      return CSAssignment.SKIP_ASSIGNMENT;
     }
 
     synchronized (this) {
-      // Check if this resource is on the blacklist
-      if (SchedulerAppUtils.isBlacklisted(this, node, LOG)) {
-        return SKIP_ASSIGNMENT;
-      }
-
       // Schedule in priority order
       for (Priority priority : getPriorities()) {
-        ResourceRequest anyRequest =
-            getResourceRequest(priority, ResourceRequest.ANY);
-        if (null == anyRequest) {
-          continue;
-        }
-
-        // Required resource
-        Resource required = anyRequest.getCapability();
-
-        // Do we need containers at this 'priority'?
-        if (getTotalRequiredResources(priority) <= 0) {
-          continue;
-        }
-
-        // AM container allocation doesn't support non-exclusive allocation to
-        // avoid painful of preempt an AM container
-        if (schedulingMode == SchedulingMode.IGNORE_PARTITION_EXCLUSIVITY) {
+        ContainerAllocation allocationResult =
+            containerAllocator.allocate(clusterResource, node,
+                schedulingMode, currentResourceLimits, priority, null);
 
-          RMAppAttempt rmAppAttempt =
-              rmContext.getRMApps()
-                  .get(getApplicationId()).getCurrentAppAttempt();
-          if (rmAppAttempt.getSubmissionContext().getUnmanagedAM() == false
-              && null == rmAppAttempt.getMasterContainer()) {
-            if (LOG.isDebugEnabled()) {
-              LOG.debug("Skip allocating AM container to app_attempt="
-                  + getApplicationAttemptId()
-                  + ", don't allow to allocate AM container in non-exclusive mode");
-            }
-            break;
-          }
-        }
+        // If it's a skipped allocation
+        AllocationState allocationState = allocationResult.getAllocationState();
 
-        // Is the node-label-expression of this offswitch resource request
-        // matches the node's label?
-        // If not match, jump to next priority.
-        if (!SchedulerUtils.checkResourceRequestMatchingNodePartition(
-            anyRequest, node.getPartition(), schedulingMode)) {
+        if (allocationState == AllocationState.PRIORITY_SKIPPED) {
           continue;
         }
-
-        if (!getCSLeafQueue().getReservationContinueLooking()) {
-          if (!shouldAllocOrReserveNewContainer(priority, required)) {
-            if (LOG.isDebugEnabled()) {
-              LOG.debug("doesn't need containers based on reservation algo!");
-            }
-            continue;
-          }
-        }
-
-        if (!checkHeadroom(clusterResource, currentResourceLimits, required,
-            node)) {
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("cannot allocate required resource=" + required
-                + " because of headroom");
-          }
-          return NULL_ASSIGNMENT;
-        }
-
-        // Inform the application it is about to get a scheduling opportunity
-        addSchedulingOpportunity(priority);
-
-        // Increase missed-non-partitioned-resource-request-opportunity.
-        // This is to make sure non-partitioned-resource-request will prefer
-        // to be allocated to non-partitioned nodes
-        int missedNonPartitionedRequestSchedulingOpportunity = 0;
-        if (anyRequest.getNodeLabelExpression().equals(
-            RMNodeLabelsManager.NO_LABEL)) {
-          missedNonPartitionedRequestSchedulingOpportunity =
-              addMissedNonPartitionedRequestSchedulingOpportunity(priority);
-        }
-
-        if (schedulingMode == SchedulingMode.IGNORE_PARTITION_EXCLUSIVITY) {
-          // Before doing allocation, we need to check scheduling opportunity to
-          // make sure : non-partitioned resource request should be scheduled to
-          // non-partitioned partition first.
-          if (missedNonPartitionedRequestSchedulingOpportunity < rmContext
-              .getScheduler().getNumClusterNodes()) {
-            if (LOG.isDebugEnabled()) {
-              LOG.debug("Skip app_attempt="
-                  + getApplicationAttemptId() + " priority="
-                  + priority
-                  + " because missed-non-partitioned-resource-request"
-                  + " opportunity under requred:" + " Now="
-                  + missedNonPartitionedRequestSchedulingOpportunity
-                  + " required="
-                  + rmContext.getScheduler().getNumClusterNodes());
-            }
-
-            return SKIP_ASSIGNMENT;
-          }
-        }
-
-        // Try to schedule
-        CSAssignment assignment =
-            assignContainersOnNode(clusterResource, node,
-                priority, null, schedulingMode, currentResourceLimits);
-
-        // Did the application skip this node?
-        if (assignment.getSkipped()) {
-          // Don't count 'skipped nodes' as a scheduling opportunity!
-          subtractSchedulingOpportunity(priority);
-          continue;
-        }
-
-        // Did we schedule or reserve a container?
-        Resource assigned = assignment.getResource();
-        if (Resources.greaterThan(rc, clusterResource,
-            assigned, Resources.none())) {
-          // Don't reset scheduling opportunities for offswitch assignments
-          // otherwise the app will be delayed for each non-local assignment.
-          // This helps apps with many off-cluster requests schedule faster.
-          if (assignment.getType() != NodeType.OFF_SWITCH) {
-            if (LOG.isDebugEnabled()) {
-              LOG.debug("Resetting scheduling opportunities");
-            }
-            resetSchedulingOpportunities(priority);
-          }
-          // Non-exclusive scheduling opportunity is different: we need reset
-          // it every time to make sure non-labeled resource request will be
-          // most likely allocated on non-labeled nodes first.
-          resetMissedNonPartitionedRequestSchedulingOpportunity(priority);
-
-          // Done
-          return assignment;
-        } else {
-          // Do not assign out of order w.r.t priorities
-          return SKIP_ASSIGNMENT;
-        }
+        return getCSAssignmentFromAllocateResult(clusterResource,
+            allocationResult);
       }
     }
 
-    return SKIP_ASSIGNMENT;
+    // We will reach here if we skipped all priorities of the app, so we will
+    // skip the app.
+    return CSAssignment.SKIP_ASSIGNMENT;
   }
 
 
   public synchronized CSAssignment assignReservedContainer(
       FiCaSchedulerNode node, RMContainer rmContainer,
       Resource clusterResource, SchedulingMode schedulingMode) {
-    // Do we still need this reservation?
-    Priority priority = rmContainer.getReservedPriority();
-    if (getTotalRequiredResources(priority) == 0) {
-      // Release
-      return new CSAssignment(this, rmContainer);
-    }
+    ContainerAllocation result =
+        containerAllocator.allocate(clusterResource, node,
+            schedulingMode, new ResourceLimits(Resources.none()),
+            rmContainer.getReservedPriority(), rmContainer);
 
-    // Try to assign if we have sufficient resources
-    CSAssignment tmp =
-        assignContainersOnNode(clusterResource, node, priority,
-          rmContainer, schedulingMode, new ResourceLimits(Resources.none()));
-
-    // Doesn't matter... since it's already charged for at time of reservation
-    // "re-reservation" is *free*
-    CSAssignment ret = new CSAssignment(Resources.none(), NodeType.NODE_LOCAL);
-    if (tmp.getAssignmentInformation().getNumAllocations() > 0) {
-      ret.setFulfilledReservation(true);
-    }
-    return ret;
+    return getCSAssignmentFromAllocateResult(clusterResource, result);
   }
-
 }

+ 69 - 92
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java

@@ -1489,7 +1489,7 @@ public class TestLeafQueue {
             ContainerState.COMPLETE, "",
             ContainerExitStatus.KILLED_BY_RESOURCEMANAGER),
         RMContainerEventType.KILL, null, true);
-    CSAssignment assignment = a.assignContainers(clusterResource, node_0,
+    a.assignContainers(clusterResource, node_0,
         new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     assertEquals(4*GB, a.getUsedResources().getMemory());
     assertEquals(0*GB, app_0.getCurrentConsumption().getMemory());
@@ -1498,7 +1498,20 @@ public class TestLeafQueue {
     assertEquals(0*GB, node_0.getUsedResource().getMemory());
   }
   
-  
+  private void verifyContainerAllocated(CSAssignment assignment, NodeType nodeType) {
+    Assert.assertTrue(Resources.greaterThan(resourceCalculator, null,
+        assignment.getResource(), Resources.none()));
+    Assert
+        .assertTrue(assignment.getAssignmentInformation().getNumAllocations() > 0);
+    Assert.assertEquals(nodeType, assignment.getType());
+  }
+
+  private void verifyNoContainerAllocated(CSAssignment assignment) {
+    Assert.assertTrue(Resources.equals(assignment.getResource(),
+        Resources.none()));
+    Assert
+        .assertTrue(assignment.getAssignmentInformation().getNumAllocations() == 0);
+  }
 
   @Test
   public void testLocalityScheduling() throws Exception {
@@ -1512,11 +1525,11 @@ public class TestLeafQueue {
     // Submit applications
     final ApplicationAttemptId appAttemptId_0 = 
         TestUtils.getMockApplicationAttemptId(0, 0); 
-    FiCaSchedulerApp app_0 = 
-        spy(new FiCaSchedulerApp(appAttemptId_0, user_0, a, 
-            mock(ActiveUsersManager.class), spyRMContext));
+    FiCaSchedulerApp app_0 =
+        new FiCaSchedulerApp(appAttemptId_0, user_0, a,
+            mock(ActiveUsersManager.class), spyRMContext);
     a.submitApplicationAttempt(app_0, user_0);
-    
+
     // Setup some nodes and racks
     String host_0 = "127.0.0.1";
     String rack_0 = "rack_0";
@@ -1561,8 +1574,7 @@ public class TestLeafQueue {
     // Start with off switch, shouldn't allocate due to delay scheduling
     assignment = a.assignContainers(clusterResource, node_2,
         new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
-    verify(app_0, never()).allocate(any(NodeType.class), eq(node_2), 
-        any(Priority.class), any(ResourceRequest.class), any(Container.class));
+    verifyNoContainerAllocated(assignment);
     assertEquals(1, app_0.getSchedulingOpportunities(priority));
     assertEquals(3, app_0.getTotalRequiredResources(priority));
     assertEquals(NodeType.NODE_LOCAL, assignment.getType()); // None->NODE_LOCAL
@@ -1570,8 +1582,7 @@ public class TestLeafQueue {
     // Another off switch, shouldn't allocate due to delay scheduling
     assignment = a.assignContainers(clusterResource, node_2,
         new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
-    verify(app_0, never()).allocate(any(NodeType.class), eq(node_2), 
-        any(Priority.class), any(ResourceRequest.class), any(Container.class));
+    verifyNoContainerAllocated(assignment);
     assertEquals(2, app_0.getSchedulingOpportunities(priority));
     assertEquals(3, app_0.getTotalRequiredResources(priority));
     assertEquals(NodeType.NODE_LOCAL, assignment.getType()); // None->NODE_LOCAL
@@ -1579,8 +1590,7 @@ public class TestLeafQueue {
     // Another off switch, shouldn't allocate due to delay scheduling
     assignment = a.assignContainers(clusterResource, node_2,
         new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
-    verify(app_0, never()).allocate(any(NodeType.class), eq(node_2), 
-        any(Priority.class), any(ResourceRequest.class), any(Container.class));
+    verifyNoContainerAllocated(assignment);
     assertEquals(3, app_0.getSchedulingOpportunities(priority));
     assertEquals(3, app_0.getTotalRequiredResources(priority));
     assertEquals(NodeType.NODE_LOCAL, assignment.getType()); // None->NODE_LOCAL
@@ -1589,26 +1599,21 @@ public class TestLeafQueue {
     // since missedOpportunities=3 and reqdContainers=3
     assignment = a.assignContainers(clusterResource, node_2,
         new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
-    verify(app_0).allocate(eq(NodeType.OFF_SWITCH), eq(node_2), 
-        any(Priority.class), any(ResourceRequest.class), any(Container.class));
+    verifyContainerAllocated(assignment, NodeType.OFF_SWITCH);
     assertEquals(4, app_0.getSchedulingOpportunities(priority)); // should NOT reset
     assertEquals(2, app_0.getTotalRequiredResources(priority));
-    assertEquals(NodeType.OFF_SWITCH, assignment.getType());
     
     // NODE_LOCAL - node_0
     assignment = a.assignContainers(clusterResource, node_0,
         new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
-    verify(app_0).allocate(eq(NodeType.NODE_LOCAL), eq(node_0), 
-        any(Priority.class), any(ResourceRequest.class), any(Container.class));
+    verifyContainerAllocated(assignment, NodeType.NODE_LOCAL);
     assertEquals(0, app_0.getSchedulingOpportunities(priority)); // should reset
     assertEquals(1, app_0.getTotalRequiredResources(priority));
-    assertEquals(NodeType.NODE_LOCAL, assignment.getType());
     
     // NODE_LOCAL - node_1
     assignment = a.assignContainers(clusterResource, node_1,
         new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
-    verify(app_0).allocate(eq(NodeType.NODE_LOCAL), eq(node_1), 
-        any(Priority.class), any(ResourceRequest.class), any(Container.class));
+    verifyContainerAllocated(assignment, NodeType.NODE_LOCAL);
     assertEquals(0, app_0.getSchedulingOpportunities(priority)); // should reset
     assertEquals(0, app_0.getTotalRequiredResources(priority));
     assertEquals(NodeType.NODE_LOCAL, assignment.getType());
@@ -1638,16 +1643,13 @@ public class TestLeafQueue {
         new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     assertEquals(1, app_0.getSchedulingOpportunities(priority));
     assertEquals(2, app_0.getTotalRequiredResources(priority));
-    assertEquals(NodeType.NODE_LOCAL, assignment.getType()); // None->NODE_LOCAL
 
     // Should assign RACK_LOCAL now
     assignment = a.assignContainers(clusterResource, node_3,
         new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
-    verify(app_0).allocate(eq(NodeType.RACK_LOCAL), eq(node_3), 
-        any(Priority.class), any(ResourceRequest.class), any(Container.class));
+    verifyContainerAllocated(assignment, NodeType.RACK_LOCAL);
     assertEquals(0, app_0.getSchedulingOpportunities(priority)); // should reset
     assertEquals(1, app_0.getTotalRequiredResources(priority));
-    assertEquals(NodeType.RACK_LOCAL, assignment.getType());
   }
   
   @Test
@@ -1661,9 +1663,9 @@ public class TestLeafQueue {
     // Submit applications
     final ApplicationAttemptId appAttemptId_0 = 
         TestUtils.getMockApplicationAttemptId(0, 0); 
-    FiCaSchedulerApp app_0 = 
-        spy(new FiCaSchedulerApp(appAttemptId_0, user_0, a, 
-            mock(ActiveUsersManager.class), spyRMContext));
+    FiCaSchedulerApp app_0 =
+        new FiCaSchedulerApp(appAttemptId_0, user_0, a,
+            mock(ActiveUsersManager.class), spyRMContext);
     a.submitApplicationAttempt(app_0, user_0);
     
     // Setup some nodes and racks
@@ -1723,63 +1725,48 @@ public class TestLeafQueue {
     
     // Start with off switch, shouldn't allocate P1 due to delay scheduling
     // thus, no P2 either!
-    a.assignContainers(clusterResource, node_2,
+    CSAssignment assignment = a.assignContainers(clusterResource, node_2,
         new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
-    verify(app_0, never()).allocate(any(NodeType.class), eq(node_2), 
-        eq(priority_1), any(ResourceRequest.class), any(Container.class));
+    verifyNoContainerAllocated(assignment);
     assertEquals(1, app_0.getSchedulingOpportunities(priority_1));
     assertEquals(2, app_0.getTotalRequiredResources(priority_1));
-    verify(app_0, never()).allocate(any(NodeType.class), eq(node_2), 
-        eq(priority_2), any(ResourceRequest.class), any(Container.class));
     assertEquals(0, app_0.getSchedulingOpportunities(priority_2));
     assertEquals(1, app_0.getTotalRequiredResources(priority_2));
 
     // Another off-switch, shouldn't allocate P1 due to delay scheduling
     // thus, no P2 either!
-    a.assignContainers(clusterResource, node_2,
+    assignment = a.assignContainers(clusterResource, node_2,
         new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
-    verify(app_0, never()).allocate(any(NodeType.class), eq(node_2), 
-        eq(priority_1), any(ResourceRequest.class), any(Container.class));
+    verifyNoContainerAllocated(assignment);
     assertEquals(2, app_0.getSchedulingOpportunities(priority_1));
     assertEquals(2, app_0.getTotalRequiredResources(priority_1));
-    verify(app_0, never()).allocate(any(NodeType.class), eq(node_2), 
-        eq(priority_2), any(ResourceRequest.class), any(Container.class));
     assertEquals(0, app_0.getSchedulingOpportunities(priority_2));
     assertEquals(1, app_0.getTotalRequiredResources(priority_2));
 
     // Another off-switch, shouldn't allocate OFF_SWITCH P1
-    a.assignContainers(clusterResource, node_2,
+    assignment = a.assignContainers(clusterResource, node_2,
         new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
-    verify(app_0).allocate(eq(NodeType.OFF_SWITCH), eq(node_2), 
-        eq(priority_1), any(ResourceRequest.class), any(Container.class));
+    verifyContainerAllocated(assignment, NodeType.OFF_SWITCH);
     assertEquals(3, app_0.getSchedulingOpportunities(priority_1));
     assertEquals(1, app_0.getTotalRequiredResources(priority_1));
-    verify(app_0, never()).allocate(any(NodeType.class), eq(node_2), 
-        eq(priority_2), any(ResourceRequest.class), any(Container.class));
     assertEquals(0, app_0.getSchedulingOpportunities(priority_2));
     assertEquals(1, app_0.getTotalRequiredResources(priority_2));
 
     // Now, DATA_LOCAL for P1
-    a.assignContainers(clusterResource, node_0,
+    assignment = a.assignContainers(clusterResource, node_0,
         new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
-    verify(app_0).allocate(eq(NodeType.NODE_LOCAL), eq(node_0), 
-        eq(priority_1), any(ResourceRequest.class), any(Container.class));
+    verifyContainerAllocated(assignment, NodeType.NODE_LOCAL);
     assertEquals(0, app_0.getSchedulingOpportunities(priority_1));
     assertEquals(0, app_0.getTotalRequiredResources(priority_1));
-    verify(app_0, never()).allocate(any(NodeType.class), eq(node_0), 
-        eq(priority_2), any(ResourceRequest.class), any(Container.class));
     assertEquals(0, app_0.getSchedulingOpportunities(priority_2));
     assertEquals(1, app_0.getTotalRequiredResources(priority_2));
 
     // Now, OFF_SWITCH for P2
-    a.assignContainers(clusterResource, node_1,
+    assignment = a.assignContainers(clusterResource, node_1,
         new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
-    verify(app_0, never()).allocate(any(NodeType.class), eq(node_1), 
-        eq(priority_1), any(ResourceRequest.class), any(Container.class));
+    verifyContainerAllocated(assignment, NodeType.OFF_SWITCH);
     assertEquals(0, app_0.getSchedulingOpportunities(priority_1));
     assertEquals(0, app_0.getTotalRequiredResources(priority_1));
-    verify(app_0).allocate(eq(NodeType.OFF_SWITCH), eq(node_1), 
-        eq(priority_2), any(ResourceRequest.class), any(Container.class));
     assertEquals(1, app_0.getSchedulingOpportunities(priority_2));
     assertEquals(0, app_0.getTotalRequiredResources(priority_2));
 
@@ -1798,8 +1785,8 @@ public class TestLeafQueue {
     final ApplicationAttemptId appAttemptId_0 = 
         TestUtils.getMockApplicationAttemptId(0, 0); 
     FiCaSchedulerApp app_0 = 
-        spy(new FiCaSchedulerApp(appAttemptId_0, user_0, a, 
-            mock(ActiveUsersManager.class), spyRMContext));
+        new FiCaSchedulerApp(appAttemptId_0, user_0, a, 
+            mock(ActiveUsersManager.class), spyRMContext);
     a.submitApplicationAttempt(app_0, user_0);
     
     // Setup some nodes and racks
@@ -1849,19 +1836,17 @@ public class TestLeafQueue {
     app_0.updateResourceRequests(app_0_requests_0);
     
     // NODE_LOCAL - node_0_1
-    a.assignContainers(clusterResource, node_0_0,
+    CSAssignment assignment = a.assignContainers(clusterResource, node_0_0,
         new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
-    verify(app_0).allocate(eq(NodeType.NODE_LOCAL), eq(node_0_0), 
-        any(Priority.class), any(ResourceRequest.class), any(Container.class));
+    verifyContainerAllocated(assignment, NodeType.NODE_LOCAL);
     assertEquals(0, app_0.getSchedulingOpportunities(priority)); // should reset
     assertEquals(0, app_0.getTotalRequiredResources(priority));
 
     // No allocation on node_1_0 even though it's node/rack local since
     // required(ANY) == 0
-    a.assignContainers(clusterResource, node_1_0,
+    assignment = a.assignContainers(clusterResource, node_1_0,
         new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
-    verify(app_0, never()).allocate(any(NodeType.class), eq(node_1_0), 
-        any(Priority.class), any(ResourceRequest.class), any(Container.class));
+    verifyNoContainerAllocated(assignment);
     assertEquals(0, app_0.getSchedulingOpportunities(priority)); // Still zero
                                                                // since #req=0
     assertEquals(0, app_0.getTotalRequiredResources(priority));
@@ -1875,21 +1860,18 @@ public class TestLeafQueue {
 
     // No allocation on node_0_1 even though it's node/rack local since
     // required(rack_1) == 0
-    a.assignContainers(clusterResource, node_0_1,
+    assignment = a.assignContainers(clusterResource, node_0_1,
         new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
-    verify(app_0, never()).allocate(any(NodeType.class), eq(node_1_0), 
-        any(Priority.class), any(ResourceRequest.class), any(Container.class));
+    verifyNoContainerAllocated(assignment);
     assertEquals(1, app_0.getSchedulingOpportunities(priority)); 
     assertEquals(1, app_0.getTotalRequiredResources(priority));
     
     // NODE_LOCAL - node_1
-    a.assignContainers(clusterResource, node_1_0,
+    assignment = a.assignContainers(clusterResource, node_1_0,
         new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
-    verify(app_0).allocate(eq(NodeType.NODE_LOCAL), eq(node_1_0), 
-        any(Priority.class), any(ResourceRequest.class), any(Container.class));
+    verifyContainerAllocated(assignment, NodeType.NODE_LOCAL);
     assertEquals(0, app_0.getSchedulingOpportunities(priority)); // should reset
     assertEquals(0, app_0.getTotalRequiredResources(priority));
-
   }
 
   @Test (timeout = 30000)
@@ -2067,16 +2049,16 @@ public class TestLeafQueue {
     // Submit applications
     final ApplicationAttemptId appAttemptId_0 = 
         TestUtils.getMockApplicationAttemptId(0, 0); 
-    FiCaSchedulerApp app_0 = 
-        spy(new FiCaSchedulerApp(appAttemptId_0, user_0, a, 
-            mock(ActiveUsersManager.class), spyRMContext));
+    FiCaSchedulerApp app_0 =
+        new FiCaSchedulerApp(appAttemptId_0, user_0, a,
+            mock(ActiveUsersManager.class), spyRMContext);
     a.submitApplicationAttempt(app_0, user_0);
 
     final ApplicationAttemptId appAttemptId_1 = 
         TestUtils.getMockApplicationAttemptId(1, 0); 
-    FiCaSchedulerApp app_1 = 
-        spy(new FiCaSchedulerApp(appAttemptId_1, user_0, a, 
-            mock(ActiveUsersManager.class), spyRMContext));
+    FiCaSchedulerApp app_1 =
+        new FiCaSchedulerApp(appAttemptId_1, user_0, a,
+            mock(ActiveUsersManager.class), spyRMContext);
     a.submitApplicationAttempt(app_1, user_0);
 
     // Setup some nodes and racks
@@ -2136,10 +2118,10 @@ public class TestLeafQueue {
     
     // node_0_1  
     // Shouldn't allocate since RR(rack_0) = null && RR(ANY) = relax: false
-    a.assignContainers(clusterResource, node_0_1, 
-        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
-    verify(app_0, never()).allocate(any(NodeType.class), eq(node_0_1), 
-        any(Priority.class), any(ResourceRequest.class), any(Container.class));
+    CSAssignment assignment =
+        a.assignContainers(clusterResource, node_0_1, new ResourceLimits(
+            clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+    verifyNoContainerAllocated(assignment);
     assertEquals(0, app_0.getSchedulingOpportunities(priority)); // should be 0
     
     // resourceName: <priority, memory, #containers, relaxLocality>
@@ -2159,10 +2141,9 @@ public class TestLeafQueue {
 
     // node_1_1  
     // Shouldn't allocate since RR(rack_1) = relax: false
-    a.assignContainers(clusterResource, node_1_1, 
+    assignment = a.assignContainers(clusterResource, node_1_1, 
         new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
-    verify(app_0, never()).allocate(any(NodeType.class), eq(node_0_1), 
-        any(Priority.class), any(ResourceRequest.class), any(Container.class));
+    verifyNoContainerAllocated(assignment);
     assertEquals(0, app_0.getSchedulingOpportunities(priority)); // should be 0
     
     // Allow rack-locality for rack_1, but blacklist node_1_1
@@ -2190,10 +2171,9 @@ public class TestLeafQueue {
 
     // node_1_1  
     // Shouldn't allocate since node_1_1 is blacklisted
-    a.assignContainers(clusterResource, node_1_1, 
+    assignment = a.assignContainers(clusterResource, node_1_1, 
         new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
-    verify(app_0, never()).allocate(any(NodeType.class), eq(node_1_1), 
-        any(Priority.class), any(ResourceRequest.class), any(Container.class));
+    verifyNoContainerAllocated(assignment);
     assertEquals(0, app_0.getSchedulingOpportunities(priority)); // should be 0
 
     // Now, remove node_1_1 from blacklist, but add rack_1 to blacklist
@@ -2219,10 +2199,9 @@ public class TestLeafQueue {
 
     // node_1_1  
     // Shouldn't allocate since rack_1 is blacklisted
-    a.assignContainers(clusterResource, node_1_1, 
+    assignment = a.assignContainers(clusterResource, node_1_1, 
         new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
-    verify(app_0, never()).allocate(any(NodeType.class), eq(node_1_1), 
-        any(Priority.class), any(ResourceRequest.class), any(Container.class));
+    verifyNoContainerAllocated(assignment);
     assertEquals(0, app_0.getSchedulingOpportunities(priority)); // should be 0
     
     // Now remove rack_1 from blacklist
@@ -2246,10 +2225,9 @@ public class TestLeafQueue {
     // Blacklist: < host_0_0 >       <----
 
     // Now, should allocate since RR(rack_1) = relax: true
-    a.assignContainers(clusterResource, node_1_1, 
+    assignment = a.assignContainers(clusterResource, node_1_1, 
         new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
-    verify(app_0,never()).allocate(eq(NodeType.RACK_LOCAL), eq(node_1_1), 
-        any(Priority.class), any(ResourceRequest.class), any(Container.class));
+    verifyNoContainerAllocated(assignment);
     assertEquals(0, app_0.getSchedulingOpportunities(priority)); 
     assertEquals(1, app_0.getTotalRequiredResources(priority));
 
@@ -2277,10 +2255,9 @@ public class TestLeafQueue {
     // host_1_0: 8G
     // host_1_1: 7G
 
-    a.assignContainers(clusterResource, node_1_0, 
+    assignment = a.assignContainers(clusterResource, node_1_0, 
         new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
-    verify(app_0).allocate(eq(NodeType.NODE_LOCAL), eq(node_1_0), 
-        any(Priority.class), any(ResourceRequest.class), any(Container.class));
+    verifyContainerAllocated(assignment, NodeType.NODE_LOCAL);
     assertEquals(0, app_0.getSchedulingOpportunities(priority)); 
     assertEquals(0, app_0.getTotalRequiredResources(priority));