Explorar o código

YARN-1015. FS should watch node resource utilization and allocate opportunistic containers if appropriate.

Haibo Chen %!s(int64=7) %!d(string=hai) anos
pai
achega
86555164c3
Modificáronse 24 ficheiros con 869 adicións e 214 borrados
  1. 2 2
      hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/FairSchedulerMetrics.java
  2. 5 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
  3. 13 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
  4. 48 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java
  5. 103 72
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
  6. 43 7
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java
  7. 32 4
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java
  8. 22 17
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java
  9. 72 33
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
  10. 5 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java
  11. 14 3
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/Schedulable.java
  12. 4 4
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/DominantResourceFairnessPolicy.java
  13. 2 2
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java
  14. 1 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/FairSchedulerQueueInfo.java
  15. 53 7
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java
  16. 1 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java
  17. 7 2
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FakeSchedulable.java
  18. 6 3
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestAppRunnability.java
  19. 2 2
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSAppAttempt.java
  20. 2 2
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java
  21. 2 2
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSSchedulerNode.java
  22. 421 47
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
  23. 8 2
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestSchedulingPolicy.java
  24. 1 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/fairscheduler/TestRMWebServicesFairSchedulerCustomResourceTypes.java

+ 2 - 2
hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/FairSchedulerMetrics.java

@@ -75,7 +75,7 @@ public class FairSchedulerMetrics extends SchedulerMetrics {
       case DEMAND:
       case DEMAND:
         return schedulable.getDemand().getMemorySize();
         return schedulable.getDemand().getMemorySize();
       case USAGE:
       case USAGE:
-        return schedulable.getResourceUsage().getMemorySize();
+        return schedulable.getGuaranteedResourceUsage().getMemorySize();
       case MINSHARE:
       case MINSHARE:
         return schedulable.getMinShare().getMemorySize();
         return schedulable.getMinShare().getMemorySize();
       case MAXSHARE:
       case MAXSHARE:
@@ -96,7 +96,7 @@ public class FairSchedulerMetrics extends SchedulerMetrics {
       case DEMAND:
       case DEMAND:
         return schedulable.getDemand().getVirtualCores();
         return schedulable.getDemand().getVirtualCores();
       case USAGE:
       case USAGE:
-        return schedulable.getResourceUsage().getVirtualCores();
+        return schedulable.getGuaranteedResourceUsage().getVirtualCores();
       case MINSHARE:
       case MINSHARE:
         return schedulable.getMinShare().getVirtualCores();
         return schedulable.getMinShare().getVirtualCores();
       case MAXSHARE:
       case MAXSHARE:

+ 5 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java

@@ -278,6 +278,11 @@ public class YarnConfiguration extends Configuration {
 
 
   public static final String APP_NAME_PLACEMENT_RULE = "app-name";
   public static final String APP_NAME_PLACEMENT_RULE = "app-name";
 
 
+  public static final String RM_SCHEDULER_OVERSUBSCRIPTION_ENABLED =
+      RM_PREFIX + "scheduler.oversubscription.enabled";
+  public static final boolean DEFAULT_RM_SCHEDULER_OVERSUBSCRIPTION_ENABLED
+      = false;
+
   /** Enable Resource Manager webapp ui actions */
   /** Enable Resource Manager webapp ui actions */
   public static final String RM_WEBAPP_UI_ACTIONS_ENABLED =
   public static final String RM_WEBAPP_UI_ACTIONS_ENABLED =
     RM_PREFIX + "webapp.ui-actions.enabled";
     RM_PREFIX + "webapp.ui-actions.enabled";

+ 13 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml

@@ -428,6 +428,19 @@
     <value>false</value>
     <value>false</value>
   </property>
   </property>
 
 
+  <property>
+    <description>
+      If set to true, the scheduler will try to over-allocate resources on the
+      nodes that allow overallocation. To enable overallocatin on a node, set
+      {code}yarn.nodemanager.overallocation.memory-utilization-threshold{code}
+      and
+      {code}yarn.nodemanager.overallocation.cpu-utilization-threshold{code}
+      to a number in the range (0.0, 1.0)
+    </description>
+    <name>yarn.resourcemanager.scheduler.oversubscription.enabled</name>
+    <value>false</value>
+  </property>
+
   <property>
   <property>
     <description>Enable RM to recover state after starting. If true, then
     <description>Enable RM to recover state after starting. If true, then
       yarn.resourcemanager.store.class must be specified. </description>
       yarn.resourcemanager.store.class must be specified. </description>

+ 48 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java

@@ -42,6 +42,8 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
 import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.api.records.OverAllocationInfo;
+import org.apache.hadoop.yarn.server.api.records.ResourceThresholds;
 import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
 import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
@@ -62,6 +64,10 @@ public abstract class SchedulerNode {
   private static final Log LOG = LogFactory.getLog(SchedulerNode.class);
   private static final Log LOG = LogFactory.getLog(SchedulerNode.class);
 
 
   private Resource capacity;
   private Resource capacity;
+  // The resource available within the node's capacity that can be given out
+  // to run GUARANTEED containers, including reserved, preempted and any
+  // remaining free resources. Resources allocated to OPPORTUNISTIC containers
+  // are tracked in allocatedResourceOpportunistic
   private Resource unallocatedResource = Resource.newInstance(0, 0);
   private Resource unallocatedResource = Resource.newInstance(0, 0);
 
 
   private RMContainer reservedContainer;
   private RMContainer reservedContainer;
@@ -663,6 +669,48 @@ public abstract class SchedulerNode {
     this.nodeAttributes = attributes;
     this.nodeAttributes = attributes;
   }
   }
 
 
+  /**
+   * Get the amount of resources that can be allocated to opportunistic
+   * containers in the case of overallocation. It is calculated as
+   * node capacity - (node utilization + resources of allocated-yet-not-started
+   * containers).
+   * @return the amount of resources that are available to be allocated to
+   *         opportunistic containers
+   */
+  public synchronized Resource allowedResourceForOverAllocation() {
+    OverAllocationInfo overAllocationInfo = rmNode.getOverAllocationInfo();
+    if (overAllocationInfo == null) {
+      LOG.debug("Overallocation is disabled on node: " + rmNode.getHostName());
+      return Resources.none();
+    }
+
+    ResourceUtilization projectedNodeUtilization = ResourceUtilization.
+        newInstance(getNodeUtilization());
+    // account for resources allocated in this heartbeat
+    projectedNodeUtilization.addTo(
+        (int) (resourceAllocatedPendingLaunch.getMemorySize()), 0,
+        (float) resourceAllocatedPendingLaunch.getVirtualCores() /
+            capacity.getVirtualCores());
+
+    ResourceThresholds thresholds =
+        overAllocationInfo.getOverAllocationThresholds();
+    Resource overAllocationThreshold = Resources.createResource(
+        (long) (capacity.getMemorySize() * thresholds.getMemoryThreshold()),
+        (int) (capacity.getVirtualCores() * thresholds.getCpuThreshold()));
+    long allowedMemory = Math.max(0, overAllocationThreshold.getMemorySize()
+        - projectedNodeUtilization.getPhysicalMemory());
+    int allowedCpu = Math.max(0, (int)
+        (overAllocationThreshold.getVirtualCores() -
+            projectedNodeUtilization.getCPU() * capacity.getVirtualCores()));
+
+    Resource resourceAllowedForOpportunisticContainers =
+        Resources.createResource(allowedMemory, allowedCpu);
+
+    // TODO cap the resources allocated to OPPORTUNISTIC containers on a node
+    // in terms of its capacity. i.e. return min(max_ratio * capacity, allowed)
+    return resourceAllowedForOpportunisticContainers;
+  }
+
   private static class ContainerInfo {
   private static class ContainerInfo {
     private final RMContainer container;
     private final RMContainer container;
     private boolean launchedOnNode;
     private boolean launchedOnNode;

+ 103 - 72
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java

@@ -36,6 +36,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.ExecutionType;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.Resource;
@@ -169,7 +170,7 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
           rmContainer.getNodeLabelExpression(),
           rmContainer.getNodeLabelExpression(),
           getUser(), 1, containerResource);
           getUser(), 1, containerResource);
       this.attemptResourceUsage.decUsed(containerResource);
       this.attemptResourceUsage.decUsed(containerResource);
-      getQueue().decUsedResource(containerResource);
+      getQueue().decUsedGuaranteedResource(containerResource);
 
 
       // Clear resource utilization metrics cache.
       // Clear resource utilization metrics cache.
       lastMemoryAggregateAllocationUpdateTime = -1;
       lastMemoryAggregateAllocationUpdateTime = -1;
@@ -178,30 +179,35 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
     }
     }
   }
   }
 
 
-  private void unreserveInternal(
+  private boolean unreserveInternal(
       SchedulerRequestKey schedulerKey, FSSchedulerNode node) {
       SchedulerRequestKey schedulerKey, FSSchedulerNode node) {
     try {
     try {
       writeLock.lock();
       writeLock.lock();
       Map<NodeId, RMContainer> reservedContainers = this.reservedContainers.get(
       Map<NodeId, RMContainer> reservedContainers = this.reservedContainers.get(
           schedulerKey);
           schedulerKey);
-      RMContainer reservedContainer = reservedContainers.remove(
-          node.getNodeID());
-      if (reservedContainers.isEmpty()) {
-        this.reservedContainers.remove(schedulerKey);
-      }
+      boolean unreserved = false;
+      if (reservedContainers != null) {
+        RMContainer reservedContainer = reservedContainers.remove(
+            node.getNodeID());
+        if (reservedContainers.isEmpty()) {
+          this.reservedContainers.remove(schedulerKey);
+        }
 
 
-      // Reset the re-reservation count
-      resetReReservations(schedulerKey);
+        // Reset the re-reservation count
+        resetReReservations(schedulerKey);
 
 
-      Resource resource = reservedContainer.getContainer().getResource();
-      this.attemptResourceUsage.decReserved(resource);
+        Resource resource = reservedContainer.getContainer().getResource();
+        this.attemptResourceUsage.decReserved(resource);
+        unreserved = true;
 
 
-      LOG.info(
-          "Application " + getApplicationId() + " unreserved " + " on node "
-              + node + ", currently has " + reservedContainers.size()
-              + " at priority " + schedulerKey.getPriority()
-              + "; currentReservation " + this.attemptResourceUsage
-              .getReserved());
+        LOG.info(
+            "Application " + getApplicationId() + " unreserved " + " on node "
+                + node + ", currently has " + reservedContainers.size()
+                + " at priority " + schedulerKey.getPriority()
+                + "; currentReservation " + this.attemptResourceUsage
+                .getReserved());
+      }
+      return unreserved;
     } finally {
     } finally {
       writeLock.unlock();
       writeLock.unlock();
     }
     }
@@ -229,7 +235,7 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
     SchedulingPolicy policy = fsQueue.getPolicy();
     SchedulingPolicy policy = fsQueue.getPolicy();
 
 
     Resource queueFairShare = fsQueue.getFairShare();
     Resource queueFairShare = fsQueue.getFairShare();
-    Resource queueUsage = fsQueue.getResourceUsage();
+    Resource queueUsage = fsQueue.getGuaranteedResourceUsage();
     Resource clusterResource = this.scheduler.getClusterResource();
     Resource clusterResource = this.scheduler.getClusterResource();
     Resource clusterUsage = this.scheduler.getRootQueueMetrics()
     Resource clusterUsage = this.scheduler.getRootQueueMetrics()
         .getAllocatedResources();
         .getAllocatedResources();
@@ -420,7 +426,7 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
 
 
   public RMContainer allocate(NodeType type, FSSchedulerNode node,
   public RMContainer allocate(NodeType type, FSSchedulerNode node,
       SchedulerRequestKey schedulerKey, PendingAsk pendingAsk,
       SchedulerRequestKey schedulerKey, PendingAsk pendingAsk,
-      Container reservedContainer) {
+      Container reservedContainer, boolean opportunistic) {
     RMContainer rmContainer;
     RMContainer rmContainer;
     Container container;
     Container container;
 
 
@@ -445,9 +451,11 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
       }
       }
 
 
       container = reservedContainer;
       container = reservedContainer;
+      ExecutionType executionType = opportunistic ?
+          ExecutionType.OPPORTUNISTIC : ExecutionType.GUARANTEED;
       if (container == null) {
       if (container == null) {
         container = createContainer(node, pendingAsk.getPerAllocationResource(),
         container = createContainer(node, pendingAsk.getPerAllocationResource(),
-            schedulerKey);
+            schedulerKey, executionType);
       }
       }
 
 
       // Create RMContainer
       // Create RMContainer
@@ -462,8 +470,12 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
       // Update consumption and track allocations
       // Update consumption and track allocations
       ContainerRequest containerRequest = appSchedulingInfo.allocate(
       ContainerRequest containerRequest = appSchedulingInfo.allocate(
           type, node, schedulerKey, container);
           type, node, schedulerKey, container);
-      this.attemptResourceUsage.incUsed(container.getResource());
-      getQueue().incUsedResource(container.getResource());
+      if (executionType.equals(ExecutionType.GUARANTEED)) {
+        this.attemptResourceUsage.incUsed(container.getResource());
+        getQueue().incUsedGuaranteedResource(container.getResource());
+      } else {
+        this.attemptOpportunisticResourceUsage.incUsed(container.getResource());
+      }
 
 
       // Update resource requests related to "request" and store in RMContainer
       // Update resource requests related to "request" and store in RMContainer
       ((RMContainerImpl) rmContainer).setContainerRequest(containerRequest);
       ((RMContainerImpl) rmContainer).setContainerRequest(containerRequest);
@@ -620,7 +632,8 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
 
 
   private Resource getUsageAfterPreemptingContainer(Resource containerResources,
   private Resource getUsageAfterPreemptingContainer(Resource containerResources,
           Resource alreadyConsideringForPreemption) {
           Resource alreadyConsideringForPreemption) {
-    Resource usageAfterPreemption = Resources.clone(getResourceUsage());
+    Resource usageAfterPreemption =
+        Resources.clone(getGuaranteedResourceUsage());
 
 
     // Subtract resources of containers already queued for preemption
     // Subtract resources of containers already queued for preemption
     synchronized (preemptionVariablesLock) {
     synchronized (preemptionVariablesLock) {
@@ -647,7 +660,7 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
    * @return Container
    * @return Container
    */
    */
   private Container createContainer(FSSchedulerNode node, Resource capability,
   private Container createContainer(FSSchedulerNode node, Resource capability,
-      SchedulerRequestKey schedulerKey) {
+      SchedulerRequestKey schedulerKey, ExecutionType executionType) {
 
 
     NodeId nodeId = node.getRMNode().getNodeID();
     NodeId nodeId = node.getRMNode().getNodeID();
     ContainerId containerId = BuilderUtils.newContainerId(
     ContainerId containerId = BuilderUtils.newContainerId(
@@ -657,7 +670,7 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
     return BuilderUtils.newContainer(containerId, nodeId,
     return BuilderUtils.newContainer(containerId, nodeId,
         node.getRMNode().getHttpAddress(), capability,
         node.getRMNode().getHttpAddress(), capability,
         schedulerKey.getPriority(), null,
         schedulerKey.getPriority(), null,
-        schedulerKey.getAllocationRequestId());
+        executionType, schedulerKey.getAllocationRequestId());
   }
   }
 
 
   @Override
   @Override
@@ -669,7 +682,8 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
       super.recoverContainer(node, rmContainer);
       super.recoverContainer(node, rmContainer);
 
 
       if (!rmContainer.getState().equals(RMContainerState.COMPLETED)) {
       if (!rmContainer.getState().equals(RMContainerState.COMPLETED)) {
-        getQueue().incUsedResource(rmContainer.getContainer().getResource());
+        getQueue().incUsedGuaranteedResource(
+            rmContainer.getContainer().getResource());
       }
       }
 
 
       // If not running unmanaged, the first container we recover is always
       // If not running unmanaged, the first container we recover is always
@@ -707,7 +721,7 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
       if (reservedContainer == null) {
       if (reservedContainer == null) {
         reservedContainer =
         reservedContainer =
             createContainer(node, perAllocationResource,
             createContainer(node, perAllocationResource,
-              schedulerKey);
+              schedulerKey, ExecutionType.GUARANTEED);
         getMetrics().reserveResource(node.getPartition(), getUser(),
         getMetrics().reserveResource(node.getPartition(), getUser(),
             reservedContainer.getResource());
             reservedContainer.getResource());
         RMContainer rmContainer =
         RMContainer rmContainer =
@@ -764,11 +778,12 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
   public void unreserve(SchedulerRequestKey schedulerKey,
   public void unreserve(SchedulerRequestKey schedulerKey,
       FSSchedulerNode node) {
       FSSchedulerNode node) {
     RMContainer rmContainer = node.getReservedContainer();
     RMContainer rmContainer = node.getReservedContainer();
-    unreserveInternal(schedulerKey, node);
-    node.unreserveResource(this);
-    clearReservation(node);
-    getMetrics().unreserveResource(node.getPartition(),
-        getUser(), rmContainer.getContainer().getResource());
+    if (unreserveInternal(schedulerKey, node)) {
+      node.unreserveResource(this);
+      clearReservation(node);
+      getMetrics().unreserveResource(node.getPartition(),
+          getUser(), rmContainer.getContainer().getResource());
+    }
   }
   }
 
 
   private void setReservation(SchedulerNode node) {
   private void setReservation(SchedulerNode node) {
@@ -842,13 +857,15 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
    */
    */
   private Resource assignContainer(
   private Resource assignContainer(
       FSSchedulerNode node, PendingAsk pendingAsk, NodeType type,
       FSSchedulerNode node, PendingAsk pendingAsk, NodeType type,
-      boolean reserved, SchedulerRequestKey schedulerKey) {
+      boolean reserved, boolean opportunistic,
+      SchedulerRequestKey schedulerKey) {
 
 
     // How much does this request need?
     // How much does this request need?
     Resource capability = pendingAsk.getPerAllocationResource();
     Resource capability = pendingAsk.getPerAllocationResource();
 
 
     // How much does the node have?
     // How much does the node have?
-    Resource available = node.getUnallocatedResource();
+    Resource available = opportunistic ?
+        node.allowedResourceForOverAllocation() : node.getUnallocatedResource();
 
 
     Container reservedContainer = null;
     Container reservedContainer = null;
     if (reserved) {
     if (reserved) {
@@ -860,39 +877,46 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
       // Inform the application of the new container for this request
       // Inform the application of the new container for this request
       RMContainer allocatedContainer =
       RMContainer allocatedContainer =
           allocate(type, node, schedulerKey, pendingAsk,
           allocate(type, node, schedulerKey, pendingAsk,
-              reservedContainer);
-      if (allocatedContainer == null) {
-        // Did the application need this resource?
-        if (reserved) {
+              reservedContainer, opportunistic);
+
+      // delete the previous reservation, if any
+      if (reserved) {
+        unreserve(schedulerKey, node);
+      }
+
+      if (allocatedContainer != null) {
+        if (opportunistic) {
+          // if an OPPORTUNISTIC container is allocated, we need to
+          // unreserve anything that we may have reserved in our
+          // previous attempt to assign GUARANTEED containers for this
+          // scheduling request.
           unreserve(schedulerKey, node);
           unreserve(schedulerKey, node);
         }
         }
+
+
+        // Inform the node
+        node.allocateContainer(allocatedContainer);
+
+        // If not running unmanaged, the first container we allocate
+        // is always the AM. Set amResource for this app and update
+        // the leaf queue's AM usage
+        if (!isAmRunning() && !getUnmanagedAM()) {
+          setAMResource(capability);
+          getQueue().addAMResourceUsage(capability);
+          setAmRunning(true);
+        }
+
+        return capability;
+      } else {
         if (LOG.isDebugEnabled()) {
         if (LOG.isDebugEnabled()) {
           LOG.debug(String.format(
           LOG.debug(String.format(
               "Resource ask %s fits in available node resources %s, " +
               "Resource ask %s fits in available node resources %s, " +
-                      "but no container was allocated",
+                  "but no container was allocated",
               capability, available));
               capability, available));
         }
         }
-        return Resources.none();
-      }
 
 
-      // If we had previously made a reservation, delete it
-      if (reserved) {
-        unreserve(schedulerKey, node);
-      }
-
-      // Inform the node
-      node.allocateContainer(allocatedContainer);
-
-      // If not running unmanaged, the first container we allocate is always
-      // the AM. Set the amResource for this app and update the leaf queue's AM
-      // usage
-      if (!isAmRunning() && !getUnmanagedAM()) {
-        setAMResource(capability);
-        getQueue().addAMResourceUsage(capability);
-        setAmRunning(true);
+        return Resources.none();
       }
       }
-
-      return capability;
     }
     }
 
 
     if (LOG.isDebugEnabled()) {
     if (LOG.isDebugEnabled()) {
@@ -903,7 +927,7 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
     // The desired container won't fit here, so reserve
     // The desired container won't fit here, so reserve
     // Reserve only, if app does not wait for preempted resources on the node,
     // Reserve only, if app does not wait for preempted resources on the node,
     // otherwise we may end up with duplicate reservations
     // otherwise we may end up with duplicate reservations
-    if (isReservable(capability) &&
+    if (isReservable(capability) && !opportunistic &&
         !node.isPreemptedForApp(this) &&
         !node.isPreemptedForApp(this) &&
         reserve(pendingAsk.getPerAllocationResource(), node, reservedContainer,
         reserve(pendingAsk.getPerAllocationResource(), node, reservedContainer,
             type, schedulerKey)) {
             type, schedulerKey)) {
@@ -949,7 +973,8 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
   }
   }
 
 
   @SuppressWarnings("deprecation")
   @SuppressWarnings("deprecation")
-  private Resource assignContainer(FSSchedulerNode node, boolean reserved) {
+  private Resource assignContainer(FSSchedulerNode node, boolean opportunistic,
+                                   boolean reserved) {
     if (LOG.isTraceEnabled()) {
     if (LOG.isTraceEnabled()) {
       LOG.trace("Node offered to app: " + getName() + " reserved: " + reserved);
       LOG.trace("Node offered to app: " + getName() + " reserved: " + reserved);
     }
     }
@@ -1012,7 +1037,7 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
                 + ", app attempt id: " + this.attemptId);
                 + ", app attempt id: " + this.attemptId);
           }
           }
           return assignContainer(node, nodeLocalPendingAsk, NodeType.NODE_LOCAL,
           return assignContainer(node, nodeLocalPendingAsk, NodeType.NODE_LOCAL,
-              reserved, schedulerKey);
+              reserved, opportunistic, schedulerKey);
         }
         }
 
 
         if (!appSchedulingInfo.canDelayTo(schedulerKey, node.getRackName())) {
         if (!appSchedulingInfo.canDelayTo(schedulerKey, node.getRackName())) {
@@ -1029,7 +1054,7 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
                 + ", app attempt id: " + this.attemptId);
                 + ", app attempt id: " + this.attemptId);
           }
           }
           return assignContainer(node, rackLocalPendingAsk, NodeType.RACK_LOCAL,
           return assignContainer(node, rackLocalPendingAsk, NodeType.RACK_LOCAL,
-              reserved, schedulerKey);
+              reserved, opportunistic, schedulerKey);
         }
         }
 
 
         PendingAsk offswitchAsk = getPendingAsk(schedulerKey,
         PendingAsk offswitchAsk = getPendingAsk(schedulerKey,
@@ -1049,7 +1074,7 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
                   + ", app attempt id: " + this.attemptId);
                   + ", app attempt id: " + this.attemptId);
             }
             }
             return assignContainer(node, offswitchAsk, NodeType.OFF_SWITCH,
             return assignContainer(node, offswitchAsk, NodeType.OFF_SWITCH,
-                reserved, schedulerKey);
+                reserved, opportunistic, schedulerKey);
           }
           }
         }
         }
 
 
@@ -1150,7 +1175,7 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
     // there's only one container size per priority.
     // there's only one container size per priority.
     if (Resources.fitsIn(node.getReservedContainer().getReservedResource(),
     if (Resources.fitsIn(node.getReservedContainer().getReservedResource(),
         node.getUnallocatedResource())) {
         node.getUnallocatedResource())) {
-      assignContainer(node, true);
+      assignContainer(node, false, true);
     }
     }
     return true;
     return true;
   }
   }
@@ -1166,7 +1191,8 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
     Resource fairDemand = Resources.componentwiseMin(threshold, demand);
     Resource fairDemand = Resources.componentwiseMin(threshold, demand);
 
 
     // Check if the queue is starved for fairshare
     // Check if the queue is starved for fairshare
-    boolean starved = isUsageBelowShare(getResourceUsage(), fairDemand);
+    boolean starved =
+        isUsageBelowShare(getGuaranteedResourceUsage(), fairDemand);
 
 
     if (!starved) {
     if (!starved) {
       lastTimeAtFairShare = now;
       lastTimeAtFairShare = now;
@@ -1178,8 +1204,8 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
       fairshareStarvation = Resources.none();
       fairshareStarvation = Resources.none();
     } else {
     } else {
       // The app has been starved for longer than preemption-timeout.
       // The app has been starved for longer than preemption-timeout.
-      fairshareStarvation =
-          Resources.subtractFromNonNegative(fairDemand, getResourceUsage());
+      fairshareStarvation = Resources.subtractFromNonNegative(fairDemand,
+          getGuaranteedResourceUsage());
     }
     }
     return fairshareStarvation;
     return fairshareStarvation;
   }
   }
@@ -1198,7 +1224,7 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
    * @return true if the app is starved for fairshare, false otherwise
    * @return true if the app is starved for fairshare, false otherwise
    */
    */
   boolean isStarvedForFairShare() {
   boolean isStarvedForFairShare() {
-    return isUsageBelowShare(getResourceUsage(), getFairShare());
+    return isUsageBelowShare(getGuaranteedResourceUsage(), getFairShare());
   }
   }
 
 
   /**
   /**
@@ -1299,7 +1325,7 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
    * Get the current app's unsatisfied demand.
    * Get the current app's unsatisfied demand.
    */
    */
   Resource getPendingDemand() {
   Resource getPendingDemand() {
-    return Resources.subtract(demand, getResourceUsage());
+    return Resources.subtract(demand, getGuaranteedResourceUsage());
   }
   }
 
 
   @Override
   @Override
@@ -1318,10 +1344,15 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
   }
   }
 
 
   @Override
   @Override
-  public Resource getResourceUsage() {
+  public Resource getGuaranteedResourceUsage() {
     return getCurrentConsumption();
     return getCurrentConsumption();
   }
   }
 
 
+  @Override
+  public Resource getOpportunisticResourceUsage() {
+    return attemptOpportunisticResourceUsage.getUsed();
+  }
+
   @Override
   @Override
   public float getWeight() {
   public float getWeight() {
     float weight = 1.0F;
     float weight = 1.0F;
@@ -1371,7 +1402,7 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
   }
   }
 
 
   @Override
   @Override
-  public Resource assignContainer(FSSchedulerNode node) {
+  public Resource assignContainer(FSSchedulerNode node, boolean opportunistic) {
     if (isOverAMShareLimit()) {
     if (isOverAMShareLimit()) {
       PendingAsk amAsk = appSchedulingInfo.getNextPendingAsk();
       PendingAsk amAsk = appSchedulingInfo.getNextPendingAsk();
       updateAMDiagnosticMsg(amAsk.getPerAllocationResource(),
       updateAMDiagnosticMsg(amAsk.getPerAllocationResource(),
@@ -1383,7 +1414,7 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
       }
       }
       return Resources.none();
       return Resources.none();
     }
     }
-    return assignContainer(node, false);
+    return assignContainer(node, opportunistic, false);
   }
   }
 
 
   /**
   /**

+ 43 - 7
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java

@@ -97,7 +97,7 @@ public class FSLeafQueue extends FSQueue {
       // when an appAttempt is created for an application, we'd like to move
       // when an appAttempt is created for an application, we'd like to move
       // it over from assignedApps to either runnableApps or nonRunnableApps
       // it over from assignedApps to either runnableApps or nonRunnableApps
       assignedApps.remove(app.getApplicationId());
       assignedApps.remove(app.getApplicationId());
-      incUsedResource(app.getResourceUsage());
+      incUsedGuaranteedResource(app.getGuaranteedResourceUsage());
     } finally {
     } finally {
       writeLock.unlock();
       writeLock.unlock();
     }
     }
@@ -132,7 +132,7 @@ public class FSLeafQueue extends FSQueue {
       getMetrics().setAMResourceUsage(amResourceUsage);
       getMetrics().setAMResourceUsage(amResourceUsage);
     }
     }
 
 
-    decUsedResource(app.getResourceUsage());
+    decUsedGuaranteedResource(app.getGuaranteedResourceUsage());
     return runnable;
     return runnable;
   }
   }
 
 
@@ -302,6 +302,42 @@ public class FSLeafQueue extends FSQueue {
     return demand;
     return demand;
   }
   }
 
 
+  @Override
+  public Resource getGuaranteedResourceUsage() {
+    Resource guaranteedResource = Resources.createResource(0);
+    readLock.lock();
+    try {
+      for (FSAppAttempt app : runnableApps) {
+        Resources.addTo(guaranteedResource, app.getGuaranteedResourceUsage());
+      }
+      for (FSAppAttempt app : nonRunnableApps) {
+        Resources.addTo(guaranteedResource, app.getGuaranteedResourceUsage());
+      }
+    } finally {
+      readLock.unlock();
+    }
+    return guaranteedResource;
+  }
+
+  @Override
+  public Resource getOpportunisticResourceUsage() {
+    Resource opportunisticResource = Resource.newInstance(0, 0);
+    readLock.lock();
+    try {
+      for (FSAppAttempt app : runnableApps) {
+        Resources.addTo(opportunisticResource,
+            app.getOpportunisticResourceUsage());
+      }
+      for (FSAppAttempt app : nonRunnableApps) {
+        Resources.addTo(opportunisticResource,
+            app.getOpportunisticResourceUsage());
+      }
+    } finally {
+      readLock.unlock();
+    }
+    return opportunisticResource;
+  }
+
   Resource getAmResourceUsage() {
   Resource getAmResourceUsage() {
     return amResourceUsage;
     return amResourceUsage;
   }
   }
@@ -335,14 +371,14 @@ public class FSLeafQueue extends FSQueue {
   }
   }
 
 
   @Override
   @Override
-  public Resource assignContainer(FSSchedulerNode node) {
+  public Resource assignContainer(FSSchedulerNode node, boolean opportunistic) {
     Resource assigned = none();
     Resource assigned = none();
     if (LOG.isDebugEnabled()) {
     if (LOG.isDebugEnabled()) {
       LOG.debug("Node " + node.getNodeName() + " offered to queue: " +
       LOG.debug("Node " + node.getNodeName() + " offered to queue: " +
           getName() + " fairShare: " + getFairShare());
           getName() + " fairShare: " + getFairShare());
     }
     }
 
 
-    if (!assignContainerPreCheck(node)) {
+    if (!assignContainerPreCheck(node, opportunistic)) {
       return assigned;
       return assigned;
     }
     }
 
 
@@ -350,7 +386,7 @@ public class FSLeafQueue extends FSQueue {
       if (SchedulerAppUtils.isPlaceBlacklisted(sched, node, LOG)) {
       if (SchedulerAppUtils.isPlaceBlacklisted(sched, node, LOG)) {
         continue;
         continue;
       }
       }
-      assigned = sched.assignContainer(node);
+      assigned = sched.assignContainer(node, opportunistic);
       if (!assigned.equals(none())) {
       if (!assigned.equals(none())) {
         if (LOG.isDebugEnabled()) {
         if (LOG.isDebugEnabled()) {
           LOG.debug("Assigned container in queue:" + getName() + " " +
           LOG.debug("Assigned container in queue:" + getName() + " " +
@@ -557,7 +593,7 @@ public class FSLeafQueue extends FSQueue {
     Resource starvation =
     Resource starvation =
         Resources.componentwiseMin(getMinShare(), getDemand());
         Resources.componentwiseMin(getMinShare(), getDemand());
 
 
-    Resources.subtractFromNonNegative(starvation, getResourceUsage());
+    Resources.subtractFromNonNegative(starvation, getGuaranteedResourceUsage());
 
 
     boolean starved = !Resources.isNone(starvation);
     boolean starved = !Resources.isNone(starvation);
     long now = scheduler.getClock().getTime();
     long now = scheduler.getClock().getTime();
@@ -616,7 +652,7 @@ public class FSLeafQueue extends FSQueue {
         ", SteadyFairShare: " + getSteadyFairShare() +
         ", SteadyFairShare: " + getSteadyFairShare() +
         ", MaxShare: " + getMaxShare() +
         ", MaxShare: " + getMaxShare() +
         ", MinShare: " + minShare +
         ", MinShare: " + minShare +
-        ", ResourceUsage: " + getResourceUsage() +
+        ", ResourceUsage: " + getGuaranteedResourceUsage() +
         ", Demand: " + getDemand() +
         ", Demand: " + getDemand() +
         ", Runnable: " + getNumRunnableApps() +
         ", Runnable: " + getNumRunnableApps() +
         ", NumPendingApps: " + getNumPendingApps() +
         ", NumPendingApps: " + getNumPendingApps() +

+ 32 - 4
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java

@@ -119,6 +119,34 @@ public class FSParentQueue extends FSQueue {
   }
   }
 
 
   @Override
   @Override
+  public Resource getGuaranteedResourceUsage() {
+    Resource guaranteedResource = Resources.createResource(0);
+    readLock.lock();
+    try {
+      for (FSQueue child : childQueues) {
+        Resources.addTo(guaranteedResource, child.getGuaranteedResourceUsage());
+      }
+    } finally {
+      readLock.unlock();
+    }
+    return guaranteedResource;
+  }
+
+  @Override
+  public Resource getOpportunisticResourceUsage() {
+    Resource opportunisticResource = Resource.newInstance(0, 0);
+    readLock.lock();
+    try {
+      for (FSQueue child : childQueues) {
+        Resources.addTo(opportunisticResource,
+            child.getOpportunisticResourceUsage());
+      }
+    } finally {
+      readLock.unlock();
+    }
+    return opportunisticResource;
+  }
+
   public void updateDemand() {
   public void updateDemand() {
     // Compute demand by iterating through apps in the queue
     // Compute demand by iterating through apps in the queue
     // Limit demand to maxResources
     // Limit demand to maxResources
@@ -177,11 +205,11 @@ public class FSParentQueue extends FSQueue {
   }
   }
 
 
   @Override
   @Override
-  public Resource assignContainer(FSSchedulerNode node) {
+  public Resource assignContainer(FSSchedulerNode node, boolean opportunistic) {
     Resource assigned = Resources.none();
     Resource assigned = Resources.none();
 
 
     // If this queue is over its limit, reject
     // If this queue is over its limit, reject
-    if (!assignContainerPreCheck(node)) {
+    if (!assignContainerPreCheck(node, opportunistic)) {
       if (LOG.isDebugEnabled()) {
       if (LOG.isDebugEnabled()) {
         LOG.debug("Assign container precheck on node " + node + " failed");
         LOG.debug("Assign container precheck on node " + node + " failed");
       }
       }
@@ -201,7 +229,7 @@ public class FSParentQueue extends FSQueue {
     try {
     try {
       sortedChildQueues.addAll(childQueues);
       sortedChildQueues.addAll(childQueues);
       for (FSQueue child : sortedChildQueues) {
       for (FSQueue child : sortedChildQueues) {
-        assigned = child.assignContainer(node);
+        assigned = child.assignContainer(node, opportunistic);
         if (!Resources.equals(assigned, Resources.none())) {
         if (!Resources.equals(assigned, Resources.none())) {
           break;
           break;
         }
         }
@@ -285,7 +313,7 @@ public class FSParentQueue extends FSQueue {
         ", SteadyFairShare: " + getSteadyFairShare() +
         ", SteadyFairShare: " + getSteadyFairShare() +
         ", MaxShare: " + getMaxShare() +
         ", MaxShare: " + getMaxShare() +
         ", MinShare: " + minShare +
         ", MinShare: " + minShare +
-        ", ResourceUsage: " + getResourceUsage() +
+        ", Guaranteed ResourceUsage: " + getGuaranteedResourceUsage() +
         ", Demand: " + getDemand() +
         ", Demand: " + getDemand() +
         ", MaxAMShare: " + maxAMShare +
         ", MaxAMShare: " + maxAMShare +
         ", Runnable: " + getNumRunnableApps() +
         ", Runnable: " + getNumRunnableApps() +

+ 22 - 17
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java

@@ -57,7 +57,7 @@ public abstract class FSQueue implements Queue, Schedulable {
   private Resource fairShare = Resources.createResource(0, 0);
   private Resource fairShare = Resources.createResource(0, 0);
   private Resource steadyFairShare = Resources.createResource(0, 0);
   private Resource steadyFairShare = Resources.createResource(0, 0);
   private Resource reservedResource = Resources.createResource(0, 0);
   private Resource reservedResource = Resources.createResource(0, 0);
-  private final Resource resourceUsage = Resource.newInstance(0, 0);
+  private final Resource guaranteedResourceUsage = Resource.newInstance(0, 0);
   private final String name;
   private final String name;
   protected final FairScheduler scheduler;
   protected final FairScheduler scheduler;
   private final YarnAuthorizationProvider authorizer;
   private final YarnAuthorizationProvider authorizer;
@@ -235,7 +235,8 @@ public abstract class FSQueue implements Queue, Schedulable {
     if (getFairShare().getMemorySize() == 0) {
     if (getFairShare().getMemorySize() == 0) {
       queueInfo.setCurrentCapacity(0.0f);
       queueInfo.setCurrentCapacity(0.0f);
     } else {
     } else {
-      queueInfo.setCurrentCapacity((float) getResourceUsage().getMemorySize() /
+      queueInfo.setCurrentCapacity(
+          (float) getGuaranteedResourceUsage().getMemorySize() /
           getFairShare().getMemorySize());
           getFairShare().getMemorySize());
     }
     }
 
 
@@ -419,14 +420,17 @@ public abstract class FSQueue implements Queue, Schedulable {
    * 
    * 
    * @return true if check passes (can assign) or false otherwise
    * @return true if check passes (can assign) or false otherwise
    */
    */
-  boolean assignContainerPreCheck(FSSchedulerNode node) {
-    if (node.getReservedContainer() != null) {
+  boolean assignContainerPreCheck(FSSchedulerNode node, boolean opportunistic) {
+    if (opportunistic) {
+      // always pre-approve OPPORTUNISTIC containers to be assigned on the node
+      return true;
+    } else if (node.getReservedContainer() != null) {
       if (LOG.isDebugEnabled()) {
       if (LOG.isDebugEnabled()) {
         LOG.debug("Assigning container failed on node '" + node.getNodeName()
         LOG.debug("Assigning container failed on node '" + node.getNodeName()
             + " because it has reserved containers.");
             + " because it has reserved containers.");
       }
       }
       return false;
       return false;
-    } else if (!Resources.fitsIn(getResourceUsage(), getMaxShare())) {
+    } else if (!Resources.fitsIn(getGuaranteedResourceUsage(), getMaxShare())) {
       if (LOG.isDebugEnabled()) {
       if (LOG.isDebugEnabled()) {
         LOG.debug("Assigning container failed on node '" + node.getNodeName()
         LOG.debug("Assigning container failed on node '" + node.getNodeName()
             + " because queue resource usage is larger than MaxShare: "
             + " because queue resource usage is larger than MaxShare: "
@@ -449,7 +453,8 @@ public abstract class FSQueue implements Queue, Schedulable {
   @Override
   @Override
   public String toString() {
   public String toString() {
     return String.format("[%s, demand=%s, running=%s, share=%s, w=%s]",
     return String.format("[%s, demand=%s, running=%s, share=%s, w=%s]",
-        getName(), getDemand(), getResourceUsage(), fairShare, getWeight());
+        getName(), getDemand(), getGuaranteedResourceUsage(), fairShare,
+        getWeight());
   }
   }
   
   
   @Override
   @Override
@@ -481,8 +486,8 @@ public abstract class FSQueue implements Queue, Schedulable {
   }
   }
 
 
   @Override
   @Override
-  public Resource getResourceUsage() {
-    return resourceUsage;
+  public Resource getGuaranteedResourceUsage() {
+    return guaranteedResourceUsage;
   }
   }
 
 
   /**
   /**
@@ -490,11 +495,11 @@ public abstract class FSQueue implements Queue, Schedulable {
    *
    *
    * @param res the resource to increase
    * @param res the resource to increase
    */
    */
-  protected void incUsedResource(Resource res) {
-    synchronized (resourceUsage) {
-      Resources.addTo(resourceUsage, res);
+  protected void incUsedGuaranteedResource(Resource res) {
+    synchronized (guaranteedResourceUsage) {
+      Resources.addTo(guaranteedResourceUsage, res);
       if (parent != null) {
       if (parent != null) {
-        parent.incUsedResource(res);
+        parent.incUsedGuaranteedResource(res);
       }
       }
     }
     }
   }
   }
@@ -504,11 +509,11 @@ public abstract class FSQueue implements Queue, Schedulable {
    *
    *
    * @param res the resource to decrease
    * @param res the resource to decrease
    */
    */
-  protected void decUsedResource(Resource res) {
-    synchronized (resourceUsage) {
-      Resources.subtractFrom(resourceUsage, res);
+  protected void decUsedGuaranteedResource(Resource res) {
+    synchronized (guaranteedResourceUsage) {
+      Resources.subtractFrom(guaranteedResourceUsage, res);
       if (parent != null) {
       if (parent != null) {
-        parent.decUsedResource(res);
+        parent.decUsedGuaranteedResource(res);
       }
       }
     }
     }
   }
   }
@@ -521,7 +526,7 @@ public abstract class FSQueue implements Queue, Schedulable {
 
 
   boolean fitsInMaxShare(Resource additionalResource) {
   boolean fitsInMaxShare(Resource additionalResource) {
     Resource usagePlusAddition =
     Resource usagePlusAddition =
-        Resources.add(getResourceUsage(), additionalResource);
+        Resources.add(getGuaranteedResourceUsage(), additionalResource);
 
 
     if (!Resources.fitsIn(usagePlusAddition, getMaxShare())) {
     if (!Resources.fitsIn(usagePlusAddition, getMaxShare())) {
       if (LOG.isDebugEnabled()) {
       if (LOG.isDebugEnabled()) {

+ 72 - 33
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java

@@ -173,7 +173,6 @@ public class FairScheduler extends
 
 
   private float reservableNodesRatio; // percentage of available nodes
   private float reservableNodesRatio; // percentage of available nodes
                                       // an app can be reserved on
                                       // an app can be reserved on
-
   protected boolean sizeBasedWeight; // Give larger weights to larger jobs
   protected boolean sizeBasedWeight; // Give larger weights to larger jobs
   // Continuous Scheduling enabled or not
   // Continuous Scheduling enabled or not
   @Deprecated
   @Deprecated
@@ -196,6 +195,8 @@ public class FairScheduler extends
   boolean maxAssignDynamic;
   boolean maxAssignDynamic;
   protected int maxAssign; // Max containers to assign per heartbeat
   protected int maxAssign; // Max containers to assign per heartbeat
 
 
+  protected boolean oversubscriptionEnabled;
+
   @VisibleForTesting
   @VisibleForTesting
   final MaxRunningAppsEnforcer maxRunningEnforcer;
   final MaxRunningAppsEnforcer maxRunningEnforcer;
 
 
@@ -1075,13 +1076,13 @@ public class FairScheduler extends
    * resources for preempted containers.
    * resources for preempted containers.
    * @param node Node to check
    * @param node Node to check
    */
    */
-  static void assignPreemptedContainers(FSSchedulerNode node) {
+  static void attemptToAssignPreemptedResources(FSSchedulerNode node) {
     for (Entry<FSAppAttempt, Resource> entry :
     for (Entry<FSAppAttempt, Resource> entry :
         node.getPreemptionList().entrySet()) {
         node.getPreemptionList().entrySet()) {
       FSAppAttempt app = entry.getKey();
       FSAppAttempt app = entry.getKey();
       Resource preemptionPending = Resources.clone(entry.getValue());
       Resource preemptionPending = Resources.clone(entry.getValue());
       while (!app.isStopped() && !Resources.isNone(preemptionPending)) {
       while (!app.isStopped() && !Resources.isNone(preemptionPending)) {
-        Resource assigned = app.assignContainer(node);
+        Resource assigned = app.assignContainer(node, false);
         if (Resources.isNone(assigned) ||
         if (Resources.isNone(assigned) ||
             assigned.equals(FairScheduler.CONTAINER_RESERVED)) {
             assigned.equals(FairScheduler.CONTAINER_RESERVED)) {
           // Fail to assign, let's not try further
           // Fail to assign, let's not try further
@@ -1113,49 +1114,85 @@ public class FairScheduler extends
       // Assign new containers...
       // Assign new containers...
       // 1. Ensure containers are assigned to the apps that preempted
       // 1. Ensure containers are assigned to the apps that preempted
       // 2. Check for reserved applications
       // 2. Check for reserved applications
-      // 3. Schedule if there are no reservations
+      // 3. Schedule GUARANTEED containers if there are no reservations
+      // 4. Schedule OPPORTUNISTIC containers if possible
 
 
       // Apps may wait for preempted containers
       // Apps may wait for preempted containers
       // We have to satisfy these first to avoid cases, when we preempt
       // We have to satisfy these first to avoid cases, when we preempt
       // a container for A from B and C gets the preempted containers,
       // a container for A from B and C gets the preempted containers,
       // when C does not qualify for preemption itself.
       // when C does not qualify for preemption itself.
-      assignPreemptedContainers(node);
-      FSAppAttempt reservedAppSchedulable = node.getReservedAppSchedulable();
-      boolean validReservation = false;
-      if (reservedAppSchedulable != null) {
-        validReservation = reservedAppSchedulable.assignReservedContainer(node);
-      }
+      attemptToAssignPreemptedResources(node);
+
+      boolean validReservation =  attemptToAssignReservedResources(node);
       if (!validReservation) {
       if (!validReservation) {
-        // No reservation, schedule at queue which is farthest below fair share
-        int assignedContainers = 0;
-        Resource assignedResource = Resources.clone(Resources.none());
-        Resource maxResourcesToAssign = Resources.multiply(
-            node.getUnallocatedResource(), 0.5f);
-
-        while (node.getReservedContainer() == null) {
-          Resource assignment = queueMgr.getRootQueue().assignContainer(node);
-
-          if (assignment.equals(Resources.none())) {
-            if (LOG.isDebugEnabled()) {
-              LOG.debug("No container is allocated on node " + node);
-            }
-            break;
-          }
+        // only attempt to assign GUARANTEED containers if there is no
+        // reservation on the node because
+        attemptToAssignResourcesAsGuaranteedContainers(node);
+      }
 
 
-          assignedContainers++;
-          Resources.addTo(assignedResource, assignment);
-          if (!shouldContinueAssigning(assignedContainers, maxResourcesToAssign,
-              assignedResource)) {
-            break;
-          }
-        }
+      // attempt to assign OPPORTUNISTIC containers regardless of whether
+      // we have made a reservation or assigned a GUARANTEED container
+      if (oversubscriptionEnabled) {
+        attemptToAssignResourcesAsOpportunisticContainers(node);
       }
       }
+
       updateRootQueueMetrics();
       updateRootQueueMetrics();
     } finally {
     } finally {
       writeLock.unlock();
       writeLock.unlock();
     }
     }
   }
   }
 
 
+  /**
+   * Assign the reserved resource to the application that have reserved it.
+   */
+  private boolean attemptToAssignReservedResources(FSSchedulerNode node) {
+    boolean success = false;
+    FSAppAttempt reservedAppSchedulable = node.getReservedAppSchedulable();
+    if (reservedAppSchedulable != null) {
+      success = reservedAppSchedulable.assignReservedContainer(node);
+    }
+    return success;
+  }
+
+  private void attemptToAssignResourcesAsGuaranteedContainers(
+      FSSchedulerNode node) {
+    // No reservation, schedule at queue which is farthest below fair share
+    int assignedContainers = 0;
+    Resource assignedResource = Resources.clone(Resources.none());
+    Resource maxResourcesToAssign = Resources.multiply(
+        node.getUnallocatedResource(), 0.5f);
+    while (node.getReservedContainer() == null) {
+      Resource assignment =
+          queueMgr.getRootQueue().assignContainer(node, false);
+      if (assignment.equals(Resources.none())) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("No container is allocated on node " + node);
+        }
+        break;
+      }
+      assignedContainers++;
+      Resources.addTo(assignedResource, assignment);
+
+      if (!shouldContinueAssigning(assignedContainers, maxResourcesToAssign,
+          assignedResource)) {
+        break;
+      }
+    }
+  }
+
+  /**
+   * Try to assign OPPORTUNISTIC containers as long as there is resources
+   * to.
+   * @param node the node to assign OPPORTUNISTIC containers on
+   */
+  private void attemptToAssignResourcesAsOpportunisticContainers(
+      FSSchedulerNode node) {
+    while (!Resources.none().equals(
+        queueMgr.getRootQueue().assignContainer(node, true))) {
+      // nothing to do here
+    }
+  }
+
   public FSAppAttempt getSchedulerApp(ApplicationAttemptId appAttemptId) {
   public FSAppAttempt getSchedulerApp(ApplicationAttemptId appAttemptId) {
     return super.getApplicationAttempt(appAttemptId);
     return super.getApplicationAttempt(appAttemptId);
   }
   }
@@ -1398,6 +1435,7 @@ public class FairScheduler extends
       sizeBasedWeight = this.conf.getSizeBasedWeight();
       sizeBasedWeight = this.conf.getSizeBasedWeight();
       usePortForNodeName = this.conf.getUsePortForNodeName();
       usePortForNodeName = this.conf.getUsePortForNodeName();
       reservableNodesRatio = this.conf.getReservableNodes();
       reservableNodesRatio = this.conf.getReservableNodes();
+      oversubscriptionEnabled = this.conf.isOversubscriptionEnabled();
 
 
       updateInterval = this.conf.getUpdateInterval();
       updateInterval = this.conf.getUpdateInterval();
       if (updateInterval < 0) {
       if (updateInterval < 0) {
@@ -1801,7 +1839,8 @@ public class FairScheduler extends
       }
       }
       
       
       // maxShare
       // maxShare
-      if (!Resources.fitsIn(Resources.add(cur.getResourceUsage(), consumption),
+      if (!Resources.fitsIn(
+          Resources.add(cur.getGuaranteedResourceUsage(), consumption),
           cur.getMaxShare())) {
           cur.getMaxShare())) {
         throw new YarnException("Moving app attempt " + appAttId + " to queue "
         throw new YarnException("Moving app attempt " + appAttId + " to queue "
             + queueName + " would violate queue maxShare constraints on"
             + queueName + " would violate queue maxShare constraints on"

+ 5 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java

@@ -348,6 +348,11 @@ public class FairSchedulerConfiguration extends Configuration {
         DEFAULT_CONTINUOUS_SCHEDULING_SLEEP_MS);
         DEFAULT_CONTINUOUS_SCHEDULING_SLEEP_MS);
   }
   }
 
 
+  public boolean isOversubscriptionEnabled() {
+    return getBoolean(YarnConfiguration.RM_SCHEDULER_OVERSUBSCRIPTION_ENABLED,
+        YarnConfiguration.DEFAULT_RM_SCHEDULER_OVERSUBSCRIPTION_ENABLED);
+  }
+
   /**
   /**
    * Delay in milliseconds for locality fallback node to rack.
    * Delay in milliseconds for locality fallback node to rack.
    * @deprecated linked to {@link #CONTINUOUS_SCHEDULING_ENABLED} deprecation
    * @deprecated linked to {@link #CONTINUOUS_SCHEDULING_ENABLED} deprecation

+ 14 - 3
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/Schedulable.java

@@ -58,8 +58,17 @@ public interface Schedulable {
    */
    */
   Resource getDemand();
   Resource getDemand();
 
 
-  /** Get the aggregate amount of resources consumed by the schedulable. */
-  Resource getResourceUsage();
+  /**
+   * Get the aggregate amount of guaranteed resources consumed by the
+   * schedulable.
+   */
+  Resource getGuaranteedResourceUsage();
+
+  /**
+   * Get the aggregate amount of opportunistic resources consumed by the
+   * schedulable.
+   */
+  Resource getOpportunisticResourceUsage();
 
 
   /** Minimum Resource share assigned to the schedulable. */
   /** Minimum Resource share assigned to the schedulable. */
   Resource getMinShare();
   Resource getMinShare();
@@ -89,8 +98,10 @@ public interface Schedulable {
   /**
   /**
    * Assign a container on this node if possible, and return the amount of
    * Assign a container on this node if possible, and return the amount of
    * resources assigned.
    * resources assigned.
+   * @param node the node to assign containers on
+   * @param opportunistic whether to assign OPPORTUNISTIC containers or not
    */
    */
-  Resource assignContainer(FSSchedulerNode node);
+  Resource assignContainer(FSSchedulerNode node, boolean opportunistic);
 
 
   /** Get the fair share assigned to this Schedulable. */
   /** Get the fair share assigned to this Schedulable. */
   Resource getFairShare();
   Resource getFairShare();

+ 4 - 4
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/DominantResourceFairnessPolicy.java

@@ -169,8 +169,8 @@ public class DominantResourceFairnessPolicy extends SchedulingPolicy {
       extends DominantResourceFairnessComparator {
       extends DominantResourceFairnessComparator {
     @Override
     @Override
     public int compare(Schedulable s1, Schedulable s2) {
     public int compare(Schedulable s1, Schedulable s2) {
-      Resource usage1 = s1.getResourceUsage();
-      Resource usage2 = s2.getResourceUsage();
+      Resource usage1 = s1.getGuaranteedResourceUsage();
+      Resource usage2 = s2.getGuaranteedResourceUsage();
       Resource minShare1 = s1.getMinShare();
       Resource minShare1 = s1.getMinShare();
       Resource minShare2 = s2.getMinShare();
       Resource minShare2 = s2.getMinShare();
       Resource clusterCapacity = fsContext.getClusterResource();
       Resource clusterCapacity = fsContext.getClusterResource();
@@ -370,9 +370,9 @@ public class DominantResourceFairnessPolicy extends SchedulingPolicy {
     @Override
     @Override
     public int compare(Schedulable s1, Schedulable s2) {
     public int compare(Schedulable s1, Schedulable s2) {
       ResourceInformation[] resourceInfo1 =
       ResourceInformation[] resourceInfo1 =
-          s1.getResourceUsage().getResources();
+          s1.getGuaranteedResourceUsage().getResources();
       ResourceInformation[] resourceInfo2 =
       ResourceInformation[] resourceInfo2 =
-          s2.getResourceUsage().getResources();
+          s2.getGuaranteedResourceUsage().getResources();
       ResourceInformation[] minShareInfo1 = s1.getMinShare().getResources();
       ResourceInformation[] minShareInfo1 = s1.getMinShare().getResources();
       ResourceInformation[] minShareInfo2 = s2.getMinShare().getResources();
       ResourceInformation[] minShareInfo2 = s2.getMinShare().getResources();
       ResourceInformation[] clusterInfo =
       ResourceInformation[] clusterInfo =

+ 2 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java

@@ -93,8 +93,8 @@ public class FairSharePolicy extends SchedulingPolicy {
       Resource resourceUsage2 = null;
       Resource resourceUsage2 = null;
 
 
       if (res == 0) {
       if (res == 0) {
-        resourceUsage1 = s1.getResourceUsage();
-        resourceUsage2 = s2.getResourceUsage();
+        resourceUsage1 = s1.getGuaranteedResourceUsage();
+        resourceUsage2 = s2.getGuaranteedResourceUsage();
         res = compareMinShareUsage(s1, s2, resourceUsage1, resourceUsage2);
         res = compareMinShareUsage(s1, s2, resourceUsage1, resourceUsage2);
       }
       }
 
 

+ 1 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/FairSchedulerQueueInfo.java

@@ -88,7 +88,7 @@ public class FairSchedulerQueueInfo {
     amMaxResources = new ResourceInfo(Resource.newInstance(
     amMaxResources = new ResourceInfo(Resource.newInstance(
         queue.getMetrics().getMaxAMShareMB(),
         queue.getMetrics().getMaxAMShareMB(),
         queue.getMetrics().getMaxAMShareVCores()));
         queue.getMetrics().getMaxAMShareVCores()));
-    usedResources = new ResourceInfo(queue.getResourceUsage());
+    usedResources = new ResourceInfo(queue.getGuaranteedResourceUsage());
     demandResources = new ResourceInfo(queue.getDemand());
     demandResources = new ResourceInfo(queue.getDemand());
     fractionMemUsed = (float)usedResources.getMemorySize() /
     fractionMemUsed = (float)usedResources.getMemorySize() /
         clusterResources.getMemorySize();
         clusterResources.getMemorySize();

+ 53 - 7
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java

@@ -18,7 +18,6 @@
 
 
 package org.apache.hadoop.yarn.server.resourcemanager;
 package org.apache.hadoop.yarn.server.resourcemanager;
 
 
-import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Collections;
 import java.util.List;
 import java.util.List;
 import java.util.Set;
 import java.util.Set;
@@ -105,7 +104,10 @@ public class MockNodes {
     return rs;
     return rs;
   }
   }
 
 
-  private static class MockRMNodeImpl implements RMNode {
+  /**
+   * A mock implementation of RMNode.
+   */
+  public static class MockRMNodeImpl implements RMNode {
     private NodeId nodeId;
     private NodeId nodeId;
     private String hostName;
     private String hostName;
     private String nodeAddr;
     private String nodeAddr;
@@ -120,12 +122,27 @@ public class MockNodes {
     private ResourceUtilization containersUtilization;
     private ResourceUtilization containersUtilization;
     private ResourceUtilization nodeUtilization;
     private ResourceUtilization nodeUtilization;
     private Resource physicalResource;
     private Resource physicalResource;
+    private OverAllocationInfo overAllocationInfo;
+    private List<UpdatedContainerInfo> containerUpdates =
+        Collections.EMPTY_LIST;
+
+    public MockRMNodeImpl(NodeId nodeId, String nodeAddr, String httpAddress,
+        Resource perNode, String rackName, String healthReport,
+        long lastHealthReportTime, int cmdPort, String hostName,
+        NodeState state, Set<String> labels,
+        ResourceUtilization containersUtilization,
+        ResourceUtilization nodeUtilization, Resource pPhysicalResource) {
+      this(nodeId, nodeAddr, httpAddress, perNode, rackName, healthReport,
+          lastHealthReportTime, cmdPort, hostName, state, labels,
+          containersUtilization, nodeUtilization, pPhysicalResource, null);
+    }
 
 
     public MockRMNodeImpl(NodeId nodeId, String nodeAddr, String httpAddress,
     public MockRMNodeImpl(NodeId nodeId, String nodeAddr, String httpAddress,
         Resource perNode, String rackName, String healthReport,
         Resource perNode, String rackName, String healthReport,
         long lastHealthReportTime, int cmdPort, String hostName, NodeState state,
         long lastHealthReportTime, int cmdPort, String hostName, NodeState state,
         Set<String> labels, ResourceUtilization containersUtilization,
         Set<String> labels, ResourceUtilization containersUtilization,
-        ResourceUtilization nodeUtilization, Resource pPhysicalResource) {
+        ResourceUtilization nodeUtilization, Resource pPhysicalResource,
+        OverAllocationInfo overAllocationInfo) {
       this.nodeId = nodeId;
       this.nodeId = nodeId;
       this.nodeAddr = nodeAddr;
       this.nodeAddr = nodeAddr;
       this.httpAddress = httpAddress;
       this.httpAddress = httpAddress;
@@ -140,6 +157,7 @@ public class MockNodes {
       this.containersUtilization = containersUtilization;
       this.containersUtilization = containersUtilization;
       this.nodeUtilization = nodeUtilization;
       this.nodeUtilization = nodeUtilization;
       this.physicalResource = pPhysicalResource;
       this.physicalResource = pPhysicalResource;
+      this.overAllocationInfo = overAllocationInfo;
     }
     }
 
 
     @Override
     @Override
@@ -228,7 +246,7 @@ public class MockNodes {
 
 
     @Override
     @Override
     public List<UpdatedContainerInfo> pullContainerUpdates() {
     public List<UpdatedContainerInfo> pullContainerUpdates() {
-      return new ArrayList<UpdatedContainerInfo>();
+      return containerUpdates;
     }
     }
 
 
     @Override
     @Override
@@ -266,7 +284,7 @@ public class MockNodes {
 
 
     @Override
     @Override
     public OverAllocationInfo getOverAllocationInfo() {
     public OverAllocationInfo getOverAllocationInfo() {
-      return null;
+      return this.overAllocationInfo;
     }
     }
 
 
     public OpportunisticContainersStatus getOpportunisticContainersStatus() {
     public OpportunisticContainersStatus getOpportunisticContainersStatus() {
@@ -311,6 +329,19 @@ public class MockNodes {
     public Resource getPhysicalResource() {
     public Resource getPhysicalResource() {
       return this.physicalResource;
       return this.physicalResource;
     }
     }
+
+    public void updateResourceUtilization(ResourceUtilization utilization) {
+      this.nodeUtilization = utilization;
+    }
+
+    public void updateContainersAndNodeUtilization(
+        UpdatedContainerInfo updatedContainerInfo,
+        ResourceUtilization resourceUtilization) {
+      if (updatedContainerInfo != null) {
+        containerUpdates = Collections.singletonList(updatedContainerInfo);
+      }
+      this.nodeUtilization = resourceUtilization;
+    }
   };
   };
 
 
   private static RMNode buildRMNode(int rack, final Resource perNode,
   private static RMNode buildRMNode(int rack, final Resource perNode,
@@ -334,6 +365,15 @@ public class MockNodes {
       NodeState state, String httpAddr, int hostnum, String hostName, int port,
       NodeState state, String httpAddr, int hostnum, String hostName, int port,
       Set<String> labels, ResourceUtilization containersUtilization,
       Set<String> labels, ResourceUtilization containersUtilization,
       ResourceUtilization nodeUtilization, Resource physicalResource) {
       ResourceUtilization nodeUtilization, Resource physicalResource) {
+    return buildRMNode(rack, perNode, state, httpAddr, hostnum, hostName, port,
+        labels, containersUtilization, nodeUtilization, physicalResource, null);
+  }
+
+  private static MockRMNodeImpl buildRMNode(int rack, final Resource perNode,
+      NodeState state, String httpAddr, int hostnum, String hostName, int port,
+      Set<String> labels, ResourceUtilization containersUtilization,
+      ResourceUtilization nodeUtilization, Resource physicalResource,
+      OverAllocationInfo overAllocationInfo) {
     final String rackName = "rack"+ rack;
     final String rackName = "rack"+ rack;
     final int nid = hostnum;
     final int nid = hostnum;
     final String nodeAddr = hostName + ":" + nid;
     final String nodeAddr = hostName + ":" + nid;
@@ -346,9 +386,9 @@ public class MockNodes {
     String healthReport = (state == NodeState.UNHEALTHY) ? null : "HealthyMe";
     String healthReport = (state == NodeState.UNHEALTHY) ? null : "HealthyMe";
     return new MockRMNodeImpl(nodeID, nodeAddr, httpAddress, perNode,
     return new MockRMNodeImpl(nodeID, nodeAddr, httpAddress, perNode,
         rackName, healthReport, 0, nid, hostName, state, labels,
         rackName, healthReport, 0, nid, hostName, state, labels,
-        containersUtilization, nodeUtilization, physicalResource);
+        containersUtilization, nodeUtilization,
+        physicalResource, overAllocationInfo);
   }
   }
-
   public static RMNode nodeInfo(int rack, final Resource perNode,
   public static RMNode nodeInfo(int rack, final Resource perNode,
       NodeState state) {
       NodeState state) {
     return buildRMNode(rack, perNode, state, "N/A");
     return buildRMNode(rack, perNode, state, "N/A");
@@ -377,4 +417,10 @@ public class MockNodes {
     return buildRMNode(rack, perNode, NodeState.RUNNING, "localhost:0", hostnum, hostName, port);
     return buildRMNode(rack, perNode, NodeState.RUNNING, "localhost:0", hostnum, hostName, port);
   }
   }
 
 
+  public static MockRMNodeImpl newNodeInfo(int rack, final Resource perNode,
+      OverAllocationInfo overAllocationInfo) {
+    return buildRMNode(rack, perNode, NodeState.RUNNING, "localhost:0",
+        NODE_ID++, null, 123, null, ResourceUtilization.newInstance(0, 0, 0.0f),
+        ResourceUtilization.newInstance(0, 0, 0.0f), null, overAllocationInfo);
+  }
 }
 }

+ 1 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java

@@ -508,7 +508,7 @@ public class TestWorkPreservingRMRestart extends ParameterizedSchedulerTestBase
     FSParentQueue root = scheduler.getQueueManager().getRootQueue();
     FSParentQueue root = scheduler.getQueueManager().getRootQueue();
     // ************ check cluster used Resources ********
     // ************ check cluster used Resources ********
     assertTrue(root.getPolicy() instanceof DominantResourceFairnessPolicy);
     assertTrue(root.getPolicy() instanceof DominantResourceFairnessPolicy);
-    assertEquals(usedResources,root.getResourceUsage());
+    assertEquals(usedResources, root.getGuaranteedResourceUsage());
 
 
     // ************ check app headroom ****************
     // ************ check app headroom ****************
     FSAppAttempt schedulerAttempt =
     FSAppAttempt schedulerAttempt =

+ 7 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FakeSchedulable.java

@@ -84,7 +84,7 @@ public class FakeSchedulable implements Schedulable {
   }
   }
   
   
   @Override
   @Override
-  public Resource assignContainer(FSSchedulerNode node) {
+  public Resource assignContainer(FSSchedulerNode node, boolean opportunistic) {
     return null;
     return null;
   }
   }
 
 
@@ -114,10 +114,15 @@ public class FakeSchedulable implements Schedulable {
   }
   }
 
 
   @Override
   @Override
-  public Resource getResourceUsage() {
+  public Resource getGuaranteedResourceUsage() {
     return usage;
     return usage;
   }
   }
 
 
+  @Override
+  public Resource getOpportunisticResourceUsage() {
+    return Resource.newInstance(0, 0);
+  }
+
   @Override
   @Override
   public long getStartTime() {
   public long getStartTime() {
     return startTime;
     return startTime;

+ 6 - 3
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestAppRunnability.java

@@ -222,7 +222,8 @@ public class TestAppRunnability extends FairSchedulerTestBase {
     scheduler.handle(nodeEvent);
     scheduler.handle(nodeEvent);
     scheduler.handle(updateEvent);
     scheduler.handle(updateEvent);
 
 
-    assertEquals(Resource.newInstance(1024, 1), oldQueue.getResourceUsage());
+    assertEquals(Resource.newInstance(1024, 1),
+        oldQueue.getGuaranteedResourceUsage());
     scheduler.update();
     scheduler.update();
     assertEquals(Resource.newInstance(3072, 3), oldQueue.getDemand());
     assertEquals(Resource.newInstance(3072, 3), oldQueue.getDemand());
 
 
@@ -231,8 +232,10 @@ public class TestAppRunnability extends FairSchedulerTestBase {
     assertSame(targetQueue, app.getQueue());
     assertSame(targetQueue, app.getQueue());
     assertFalse(oldQueue.isRunnableApp(app));
     assertFalse(oldQueue.isRunnableApp(app));
     assertTrue(targetQueue.isRunnableApp(app));
     assertTrue(targetQueue.isRunnableApp(app));
-    assertEquals(Resource.newInstance(0, 0), oldQueue.getResourceUsage());
-    assertEquals(Resource.newInstance(1024, 1), targetQueue.getResourceUsage());
+    assertEquals(Resource.newInstance(0, 0),
+        oldQueue.getGuaranteedResourceUsage());
+    assertEquals(Resource.newInstance(1024, 1),
+        targetQueue.getGuaranteedResourceUsage());
     assertEquals(0, oldQueue.getNumRunnableApps());
     assertEquals(0, oldQueue.getNumRunnableApps());
     assertEquals(1, targetQueue.getNumRunnableApps());
     assertEquals(1, targetQueue.getNumRunnableApps());
     assertEquals(1, queueMgr.getRootQueue().getNumRunnableApps());
     assertEquals(1, queueMgr.getRootQueue().getNumRunnableApps());

+ 2 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSAppAttempt.java

@@ -223,7 +223,7 @@ public class TestFSAppAttempt extends FairSchedulerTestBase {
 
 
     Mockito.when(mockQueue.getMaxShare()).thenReturn(queueMaxResources);
     Mockito.when(mockQueue.getMaxShare()).thenReturn(queueMaxResources);
     Mockito.when(mockQueue.getFairShare()).thenReturn(queueFairShare);
     Mockito.when(mockQueue.getFairShare()).thenReturn(queueFairShare);
-    Mockito.when(mockQueue.getResourceUsage()).thenReturn(queueUsage);
+    Mockito.when(mockQueue.getGuaranteedResourceUsage()).thenReturn(queueUsage);
     Mockito.when(mockScheduler.getClusterResource()).thenReturn
     Mockito.when(mockScheduler.getClusterResource()).thenReturn
         (clusterResource);
         (clusterResource);
     Mockito.when(fakeRootQueueMetrics.getAllocatedResources()).thenReturn
     Mockito.when(fakeRootQueueMetrics.getAllocatedResources()).thenReturn
@@ -305,7 +305,7 @@ public class TestFSAppAttempt extends FairSchedulerTestBase {
             getApplicationId()));
             getApplicationId()));
     FSAppAttempt app = scheduler.getSchedulerApp(id11);
     FSAppAttempt app = scheduler.getSchedulerApp(id11);
     assertNotNull(app);
     assertNotNull(app);
-    Resource queueUsage = app.getQueue().getResourceUsage();
+    Resource queueUsage = app.getQueue().getGuaranteedResourceUsage();
     assertEquals(0, queueUsage.getMemorySize());
     assertEquals(0, queueUsage.getMemorySize());
     assertEquals(0, queueUsage.getVirtualCores());
     assertEquals(0, queueUsage.getVirtualCores());
     SchedulerNode n1 = scheduler.getSchedulerNode(node1.getNodeID());
     SchedulerNode n1 = scheduler.getSchedulerNode(node1.getNodeID());

+ 2 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java

@@ -88,7 +88,7 @@ public class TestFSLeafQueue extends FairSchedulerTestBase {
 
 
     FSAppAttempt app = mock(FSAppAttempt.class);
     FSAppAttempt app = mock(FSAppAttempt.class);
     Mockito.when(app.getDemand()).thenReturn(maxResource);
     Mockito.when(app.getDemand()).thenReturn(maxResource);
-    Mockito.when(app.getResourceUsage()).thenReturn(Resources.none());
+    Mockito.when(app.getGuaranteedResourceUsage()).thenReturn(Resources.none());
 
 
     schedulable.addApp(app, true);
     schedulable.addApp(app, true);
     schedulable.addApp(app, true);
     schedulable.addApp(app, true);
@@ -176,7 +176,7 @@ public class TestFSLeafQueue extends FairSchedulerTestBase {
       @Override
       @Override
       public void run() {
       public void run() {
         for (int i=0; i < 500; i++) {
         for (int i=0; i < 500; i++) {
-          schedulable.getResourceUsage();
+          schedulable.getGuaranteedResourceUsage();
         }
         }
       }
       }
     });
     });

+ 2 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSSchedulerNode.java

@@ -98,7 +98,7 @@ public class TestFSSchedulerNode {
     ApplicationAttemptId appAttemptId =
     ApplicationAttemptId appAttemptId =
         mock(ApplicationAttemptId.class);
         mock(ApplicationAttemptId.class);
     when(starvingApp.getApplicationAttemptId()).thenReturn(appAttemptId);
     when(starvingApp.getApplicationAttemptId()).thenReturn(appAttemptId);
-    when(starvingApp.assignContainer(schedulerNode)).thenAnswer(
+    when(starvingApp.assignContainer(schedulerNode, false)).thenAnswer(
         new Answer<Resource>() {
         new Answer<Resource>() {
           @Override
           @Override
           public Resource answer(InvocationOnMock invocationOnMock)
           public Resource answer(InvocationOnMock invocationOnMock)
@@ -142,7 +142,7 @@ public class TestFSSchedulerNode {
   }
   }
 
 
   private void allocateContainers(FSSchedulerNode schedulerNode) {
   private void allocateContainers(FSSchedulerNode schedulerNode) {
-    FairScheduler.assignPreemptedContainers(schedulerNode);
+    FairScheduler.attemptToAssignPreemptedResources(schedulerNode);
   }
   }
 
 
   /**
   /**

+ 421 - 47
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java

@@ -57,8 +57,13 @@ import org.apache.hadoop.yarn.MockApps;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
 import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
 import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.ExecutionType;
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.NodeState;
 import org.apache.hadoop.yarn.api.records.NodeState;
@@ -66,6 +71,7 @@ import org.apache.hadoop.yarn.api.records.QueueInfo;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceInformation;
 import org.apache.hadoop.yarn.api.records.ResourceInformation;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.api.records.ResourceUtilization;
 import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl;
 import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.AsyncDispatcher;
 import org.apache.hadoop.yarn.event.AsyncDispatcher;
@@ -77,6 +83,8 @@ import org.apache.hadoop.yarn.exceptions
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.security.YarnAuthorizationProvider;
 import org.apache.hadoop.yarn.security.YarnAuthorizationProvider;
+import org.apache.hadoop.yarn.server.api.records.OverAllocationInfo;
+import org.apache.hadoop.yarn.server.api.records.ResourceThresholds;
 import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService;
 import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService;
 import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
 import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
 import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
 import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
@@ -99,6 +107,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeResourceUpdateEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeResourceUpdateEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
 
 
 
 
@@ -1062,15 +1071,15 @@ public class TestFairScheduler extends FairSchedulerTestBase {
     assertEquals(
     assertEquals(
         YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
         YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
         scheduler.getQueueManager().getQueue("queue1").
         scheduler.getQueueManager().getQueue("queue1").
-            getResourceUsage().getMemorySize());
+            getGuaranteedResourceUsage().getMemorySize());
 
 
     NodeUpdateSchedulerEvent updateEvent2 = new NodeUpdateSchedulerEvent(node2);
     NodeUpdateSchedulerEvent updateEvent2 = new NodeUpdateSchedulerEvent(node2);
     scheduler.handle(updateEvent2);
     scheduler.handle(updateEvent2);
 
 
     assertEquals(1024, scheduler.getQueueManager().getQueue("queue1").
     assertEquals(1024, scheduler.getQueueManager().getQueue("queue1").
-      getResourceUsage().getMemorySize());
+        getGuaranteedResourceUsage().getMemorySize());
     assertEquals(2, scheduler.getQueueManager().getQueue("queue1").
     assertEquals(2, scheduler.getQueueManager().getQueue("queue1").
-      getResourceUsage().getVirtualCores());
+        getGuaranteedResourceUsage().getVirtualCores());
 
 
     // verify metrics
     // verify metrics
     QueueMetrics queue1Metrics = scheduler.getQueueManager().getQueue("queue1")
     QueueMetrics queue1Metrics = scheduler.getQueueManager().getQueue("queue1")
@@ -1105,7 +1114,7 @@ public class TestFairScheduler extends FairSchedulerTestBase {
 
 
     // Make sure queue 1 is allocated app capacity
     // Make sure queue 1 is allocated app capacity
     assertEquals(1024, scheduler.getQueueManager().getQueue("queue1").
     assertEquals(1024, scheduler.getQueueManager().getQueue("queue1").
-        getResourceUsage().getMemorySize());
+        getGuaranteedResourceUsage().getMemorySize());
 
 
     // Now queue 2 requests likewise
     // Now queue 2 requests likewise
     ApplicationAttemptId attId = createSchedulingRequest(1024, "queue2", "user1", 1);
     ApplicationAttemptId attId = createSchedulingRequest(1024, "queue2", "user1", 1);
@@ -1115,7 +1124,7 @@ public class TestFairScheduler extends FairSchedulerTestBase {
 
 
     // Make sure queue 2 is waiting with a reservation
     // Make sure queue 2 is waiting with a reservation
     assertEquals(0, scheduler.getQueueManager().getQueue("queue2").
     assertEquals(0, scheduler.getQueueManager().getQueue("queue2").
-        getResourceUsage().getMemorySize());
+        getGuaranteedResourceUsage().getMemorySize());
     assertEquals(1024, scheduler.getSchedulerApp(attId).getCurrentReservation().getMemorySize());
     assertEquals(1024, scheduler.getSchedulerApp(attId).getCurrentReservation().getMemorySize());
 
 
     // Now another node checks in with capacity
     // Now another node checks in with capacity
@@ -1129,7 +1138,7 @@ public class TestFairScheduler extends FairSchedulerTestBase {
 
 
     // Make sure this goes to queue 2
     // Make sure this goes to queue 2
     assertEquals(1024, scheduler.getQueueManager().getQueue("queue2").
     assertEquals(1024, scheduler.getQueueManager().getQueue("queue2").
-        getResourceUsage().getMemorySize());
+        getGuaranteedResourceUsage().getMemorySize());
 
 
     // The old reservation should still be there...
     // The old reservation should still be there...
     assertEquals(1024, scheduler.getSchedulerApp(attId).getCurrentReservation().getMemorySize());
     assertEquals(1024, scheduler.getSchedulerApp(attId).getCurrentReservation().getMemorySize());
@@ -1139,7 +1148,7 @@ public class TestFairScheduler extends FairSchedulerTestBase {
 
 
   }
   }
 
 
-  @Test (timeout = 5000)
+  @Test
   public void testOffSwitchAppReservationThreshold() throws Exception {
   public void testOffSwitchAppReservationThreshold() throws Exception {
     conf.setFloat(FairSchedulerConfiguration.RESERVABLE_NODES, 0.50f);
     conf.setFloat(FairSchedulerConfiguration.RESERVABLE_NODES, 0.50f);
     scheduler.init(conf);
     scheduler.init(conf);
@@ -1179,7 +1188,7 @@ public class TestFairScheduler extends FairSchedulerTestBase {
 
 
     // Verify capacity allocation
     // Verify capacity allocation
     assertEquals(6144, scheduler.getQueueManager().getQueue("queue1").
     assertEquals(6144, scheduler.getQueueManager().getQueue("queue1").
-            getResourceUsage().getMemorySize());
+        getGuaranteedResourceUsage().getMemorySize());
 
 
     // Create new app with a resource request that can be satisfied by any
     // Create new app with a resource request that can be satisfied by any
     // node but would be
     // node but would be
@@ -1211,7 +1220,7 @@ public class TestFairScheduler extends FairSchedulerTestBase {
     scheduler.update();
     scheduler.update();
     scheduler.handle(new NodeUpdateSchedulerEvent(node4));
     scheduler.handle(new NodeUpdateSchedulerEvent(node4));
     assertEquals(8192, scheduler.getQueueManager().getQueue("queue1").
     assertEquals(8192, scheduler.getQueueManager().getQueue("queue1").
-            getResourceUsage().getMemorySize());
+        getGuaranteedResourceUsage().getMemorySize());
 
 
     scheduler.handle(new NodeUpdateSchedulerEvent(node1));
     scheduler.handle(new NodeUpdateSchedulerEvent(node1));
     scheduler.handle(new NodeUpdateSchedulerEvent(node2));
     scheduler.handle(new NodeUpdateSchedulerEvent(node2));
@@ -1272,7 +1281,7 @@ public class TestFairScheduler extends FairSchedulerTestBase {
 
 
     // Verify capacity allocation
     // Verify capacity allocation
     assertEquals(8192, scheduler.getQueueManager().getQueue("queue1").
     assertEquals(8192, scheduler.getQueueManager().getQueue("queue1").
-            getResourceUsage().getMemorySize());
+        getGuaranteedResourceUsage().getMemorySize());
 
 
     // Create new app with a resource request that can be satisfied by any
     // Create new app with a resource request that can be satisfied by any
     // node but would be
     // node but would be
@@ -1317,7 +1326,7 @@ public class TestFairScheduler extends FairSchedulerTestBase {
     scheduler.update();
     scheduler.update();
     scheduler.handle(new NodeUpdateSchedulerEvent(node4));
     scheduler.handle(new NodeUpdateSchedulerEvent(node4));
     assertEquals(10240, scheduler.getQueueManager().getQueue("queue1").
     assertEquals(10240, scheduler.getQueueManager().getQueue("queue1").
-            getResourceUsage().getMemorySize());
+        getGuaranteedResourceUsage().getMemorySize());
 
 
     scheduler.handle(new NodeUpdateSchedulerEvent(node1));
     scheduler.handle(new NodeUpdateSchedulerEvent(node1));
     scheduler.handle(new NodeUpdateSchedulerEvent(node2));
     scheduler.handle(new NodeUpdateSchedulerEvent(node2));
@@ -1361,7 +1370,7 @@ public class TestFairScheduler extends FairSchedulerTestBase {
 
 
     // Verify capacity allocation
     // Verify capacity allocation
     assertEquals(8192, scheduler.getQueueManager().getQueue("queue1").
     assertEquals(8192, scheduler.getQueueManager().getQueue("queue1").
-            getResourceUsage().getMemorySize());
+        getGuaranteedResourceUsage().getMemorySize());
 
 
     // Verify number of reservations have decremented
     // Verify number of reservations have decremented
     assertEquals(0,
     assertEquals(0,
@@ -1405,7 +1414,7 @@ public class TestFairScheduler extends FairSchedulerTestBase {
 
 
     // Make sure queue 1 is allocated app capacity
     // Make sure queue 1 is allocated app capacity
     assertEquals(2048, scheduler.getQueueManager().getQueue("queue1").
     assertEquals(2048, scheduler.getQueueManager().getQueue("queue1").
-        getResourceUsage().getMemorySize());
+        getGuaranteedResourceUsage().getMemorySize());
 
 
     // Now queue 2 requests likewise
     // Now queue 2 requests likewise
     createSchedulingRequest(1024, "queue2", "user2", 1);
     createSchedulingRequest(1024, "queue2", "user2", 1);
@@ -1414,7 +1423,7 @@ public class TestFairScheduler extends FairSchedulerTestBase {
 
 
     // Make sure queue 2 is allocated app capacity
     // Make sure queue 2 is allocated app capacity
     assertEquals(1024, scheduler.getQueueManager().getQueue("queue2").
     assertEquals(1024, scheduler.getQueueManager().getQueue("queue2").
-        getResourceUsage().getMemorySize());
+        getGuaranteedResourceUsage().getMemorySize());
 
 
     ApplicationAttemptId attId1 = createSchedulingRequest(1024, "queue1", "user1", 1);
     ApplicationAttemptId attId1 = createSchedulingRequest(1024, "queue1", "user1", 1);
     scheduler.update();
     scheduler.update();
@@ -1540,7 +1549,7 @@ public class TestFairScheduler extends FairSchedulerTestBase {
 
 
     // Make sure queue 1 is allocated app capacity
     // Make sure queue 1 is allocated app capacity
     assertEquals(2048, scheduler.getQueueManager().getQueue("queue1").
     assertEquals(2048, scheduler.getQueueManager().getQueue("queue1").
-        getResourceUsage().getMemorySize());
+        getGuaranteedResourceUsage().getMemorySize());
 
 
     // Now queue 2 requests likewise
     // Now queue 2 requests likewise
     createSchedulingRequest(1024, "queue2", "user2", 1);
     createSchedulingRequest(1024, "queue2", "user2", 1);
@@ -1549,7 +1558,7 @@ public class TestFairScheduler extends FairSchedulerTestBase {
 
 
     // Make sure queue 2 is allocated app capacity
     // Make sure queue 2 is allocated app capacity
     assertEquals(1024, scheduler.getQueueManager().getQueue("queue2").
     assertEquals(1024, scheduler.getQueueManager().getQueue("queue2").
-      getResourceUsage().getMemorySize());
+        getGuaranteedResourceUsage().getMemorySize());
     
     
     ApplicationAttemptId attId1 = createSchedulingRequest(1024, "queue1", "user1", 1);
     ApplicationAttemptId attId1 = createSchedulingRequest(1024, "queue1", "user1", 1);
     scheduler.update();
     scheduler.update();
@@ -1589,12 +1598,12 @@ public class TestFairScheduler extends FairSchedulerTestBase {
 
 
     // Make sure allocated memory of queue1 doesn't exceed its maximum
     // Make sure allocated memory of queue1 doesn't exceed its maximum
     assertEquals(2048, scheduler.getQueueManager().getQueue("queue1").
     assertEquals(2048, scheduler.getQueueManager().getQueue("queue1").
-        getResourceUsage().getMemorySize());
+        getGuaranteedResourceUsage().getMemorySize());
     //the reservation of queue1 should be reclaim
     //the reservation of queue1 should be reclaim
     assertEquals(0, scheduler.getSchedulerApp(attId1).
     assertEquals(0, scheduler.getSchedulerApp(attId1).
         getCurrentReservation().getMemorySize());
         getCurrentReservation().getMemorySize());
     assertEquals(1024, scheduler.getQueueManager().getQueue("queue2").
     assertEquals(1024, scheduler.getQueueManager().getQueue("queue2").
-        getResourceUsage().getMemorySize());
+        getGuaranteedResourceUsage().getMemorySize());
   }
   }
 
 
   @Test
   @Test
@@ -1634,7 +1643,7 @@ public class TestFairScheduler extends FairSchedulerTestBase {
 
 
     // Make sure queue 1 is allocated app capacity
     // Make sure queue 1 is allocated app capacity
     assertEquals(4096, scheduler.getQueueManager().getQueue("queue1").
     assertEquals(4096, scheduler.getQueueManager().getQueue("queue1").
-        getResourceUsage().getMemorySize());
+        getGuaranteedResourceUsage().getMemorySize());
 
 
     // Now queue 2 requests below threshold
     // Now queue 2 requests below threshold
     ApplicationAttemptId attId = createSchedulingRequest(1024, "queue2", "user1", 1);
     ApplicationAttemptId attId = createSchedulingRequest(1024, "queue2", "user1", 1);
@@ -1643,7 +1652,7 @@ public class TestFairScheduler extends FairSchedulerTestBase {
 
 
     // Make sure queue 2 has no reservation
     // Make sure queue 2 has no reservation
     assertEquals(0, scheduler.getQueueManager().getQueue("queue2").
     assertEquals(0, scheduler.getQueueManager().getQueue("queue2").
-        getResourceUsage().getMemorySize());
+        getGuaranteedResourceUsage().getMemorySize());
     assertEquals(0,
     assertEquals(0,
         scheduler.getSchedulerApp(attId).getReservedContainers().size());
         scheduler.getSchedulerApp(attId).getReservedContainers().size());
 
 
@@ -1654,7 +1663,7 @@ public class TestFairScheduler extends FairSchedulerTestBase {
 
 
     // Make sure queue 2 is waiting with a reservation
     // Make sure queue 2 is waiting with a reservation
     assertEquals(0, scheduler.getQueueManager().getQueue("queue2").
     assertEquals(0, scheduler.getQueueManager().getQueue("queue2").
-        getResourceUsage().getMemorySize());
+        getGuaranteedResourceUsage().getMemorySize());
     assertEquals(3, scheduler.getSchedulerApp(attId).getCurrentReservation()
     assertEquals(3, scheduler.getSchedulerApp(attId).getCurrentReservation()
         .getVirtualCores());
         .getVirtualCores());
 
 
@@ -1669,7 +1678,7 @@ public class TestFairScheduler extends FairSchedulerTestBase {
 
 
     // Make sure this goes to queue 2
     // Make sure this goes to queue 2
     assertEquals(3, scheduler.getQueueManager().getQueue("queue2").
     assertEquals(3, scheduler.getQueueManager().getQueue("queue2").
-        getResourceUsage().getVirtualCores());
+        getGuaranteedResourceUsage().getVirtualCores());
 
 
     // The old reservation should still be there...
     // The old reservation should still be there...
     assertEquals(3, scheduler.getSchedulerApp(attId).getCurrentReservation()
     assertEquals(3, scheduler.getSchedulerApp(attId).getCurrentReservation()
@@ -2702,7 +2711,361 @@ public class TestFairScheduler extends FairSchedulerTestBase {
         2, liveContainers.iterator().next().getContainer().
         2, liveContainers.iterator().next().getContainer().
             getPriority().getPriority());
             getPriority().getPriority());
   }
   }
-  
+
+  /**
+   * Test that NO OPPORTUNISTIC containers can be allocated on a node that
+   * is fully allocated and with a very high utilization.
+   */
+  @Test
+  public void testAllocateNoOpportunisticContainersOnBusyNode()
+      throws IOException {
+    conf.setBoolean(
+        YarnConfiguration.RM_SCHEDULER_OVERSUBSCRIPTION_ENABLED, true);
+    // disable resource request normalization in fair scheduler
+    int memoryAllocationIncrement = conf.getInt(
+        FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB,
+        FairSchedulerConfiguration.
+            DEFAULT_RM_SCHEDULER_INCREMENT_ALLOCATION_MB);
+    conf.setInt(
+        FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB, 1);
+    int memoryAllocationMinimum = conf.getInt(
+        YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
+        YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB);
+    conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 1);
+    try {
+      scheduler.init(conf);
+      scheduler.start();
+      scheduler.reinitialize(conf, resourceManager.getRMContext());
+
+      // Add a node with 4G of memory and 4 vcores and an overallocation
+      // threshold of 0.75f and 0.75f for memory and cpu respectively
+      OverAllocationInfo overAllocationInfo = OverAllocationInfo.newInstance(
+          ResourceThresholds.newInstance(0.75f, 0.75f));
+      MockNodes.MockRMNodeImpl node = MockNodes.newNodeInfo(1,
+          Resources.createResource(2048, 2), overAllocationInfo);
+      scheduler.handle(new NodeAddedSchedulerEvent(node));
+
+      // create a scheduling request that takes up the node's full memory
+      ApplicationAttemptId appAttempt1 =
+          createSchedulingRequest(2048, "queue1", "user1", 1);
+      scheduler.handle(new NodeUpdateSchedulerEvent(node));
+      assertEquals(2048, scheduler.getQueueManager().getQueue("queue1").
+          getGuaranteedResourceUsage().getMemorySize());
+      List<Container> allocatedContainers1 =
+          scheduler.getSchedulerApp(appAttempt1).pullNewlyAllocatedContainers();
+      assertTrue(allocatedContainers1.size() == 1);
+      assertEquals("unexpected container execution type",
+          ExecutionType.GUARANTEED,
+          allocatedContainers1.get(0).getExecutionType());
+
+      // node utilization shoots up after the container runs on the node
+      ContainerStatus containerStatus = ContainerStatus.newInstance(
+          allocatedContainers1.get(0).getId(), ContainerState.RUNNING, "",
+          ContainerExitStatus.SUCCESS);
+      node.updateContainersAndNodeUtilization(
+          new UpdatedContainerInfo(Collections.singletonList(containerStatus),
+              Collections.emptyList()),
+          ResourceUtilization.newInstance(2000, 0, 0.8f));
+
+      // create another scheduling request
+      ApplicationAttemptId appAttempt2
+          = createSchedulingRequest(100, "queue2", "user1", 1);
+      scheduler.handle(new NodeUpdateSchedulerEvent(node));
+      List<Container> allocatedContainers2 =
+          scheduler.getSchedulerApp(appAttempt2).pullNewlyAllocatedContainers();
+      assertTrue("Expecting no containers allocated",
+          allocatedContainers2.size() == 0);
+      assertEquals(0, scheduler.getQueueManager().getQueue("queue2").
+          getOpportunisticResourceUsage().getMemorySize());
+
+      // verify that a reservation is made for the second resource request
+      Resource reserved = scheduler.getNode(node.getNodeID()).
+          getReservedContainer().getReservedResource();
+      assertTrue("Expect a reservation made for the second resource request",
+          reserved.equals(Resource.newInstance(100, 1)));
+    } finally {
+      conf.setBoolean(YarnConfiguration.RM_SCHEDULER_OVERSUBSCRIPTION_ENABLED,
+          false);
+      conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
+          memoryAllocationMinimum);
+      conf.setInt(
+          FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB,
+          memoryAllocationIncrement);
+    }
+  }
+
+  /**
+   * Test that OPPORTUNISTIC containers can be allocated on a node with low
+   * utilization even though there is not enough unallocated resource on the
+   * node to accommodate the request.
+   */
+  @Test
+  public void testAllocateOpportunisticContainersOnPartiallyOverAllocatedNode()
+      throws IOException {
+    conf.setBoolean(YarnConfiguration.RM_SCHEDULER_OVERSUBSCRIPTION_ENABLED,
+        true);
+    // disable resource request normalization in fair scheduler
+    int memoryAllocationIncrement = conf.getInt(
+        FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB,
+        FairSchedulerConfiguration.
+            DEFAULT_RM_SCHEDULER_INCREMENT_ALLOCATION_MB);
+    conf.setInt(
+        FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB, 1);
+    int memoryAllocationMinimum = conf.getInt(
+        YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
+        YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB);
+    conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 1);
+
+    try {
+      scheduler.init(conf);
+      scheduler.start();
+      scheduler.reinitialize(conf, resourceManager.getRMContext());
+
+      // Add a node with 4G of memory and 4 vcores and an overallocation
+      // threshold of 0.75f and 0.75f for memory and cpu respectively
+      OverAllocationInfo overAllocationInfo = OverAllocationInfo.newInstance(
+          ResourceThresholds.newInstance(0.75f, 0.75f));
+      MockNodes.MockRMNodeImpl node = MockNodes.newNodeInfo(1,
+          Resources.createResource(4096, 4), overAllocationInfo);
+      scheduler.handle(new NodeAddedSchedulerEvent(node));
+
+      // create a scheduling request that leaves some unallocated resources
+      ApplicationAttemptId appAttempt1 =
+          createSchedulingRequest(3600, "queue1", "user1", 1);
+      scheduler.handle(new NodeUpdateSchedulerEvent(node));
+      assertEquals(3600, scheduler.getQueueManager().getQueue("queue1").
+          getGuaranteedResourceUsage().getMemorySize());
+      List<Container> allocatedContainers1 =
+          scheduler.getSchedulerApp(appAttempt1).pullNewlyAllocatedContainers();
+      assertTrue(allocatedContainers1.size() == 1);
+      assertEquals("unexpected container execution type",
+          ExecutionType.GUARANTEED,
+          allocatedContainers1.get(0).getExecutionType());
+
+      // node utilization is low after the container is launched on the node
+      ContainerStatus containerStatus = ContainerStatus.newInstance(
+          allocatedContainers1.get(0).getId(), ContainerState.RUNNING, "",
+          ContainerExitStatus.SUCCESS);
+      node.updateContainersAndNodeUtilization(
+          new UpdatedContainerInfo(Collections.singletonList(containerStatus),
+              Collections.emptyList()),
+          ResourceUtilization.newInstance(1800, 0, 0.5f));
+
+      // create another scheduling request that asks for more than what's left
+      // unallocated on the node but can be served with overallocation.
+      ApplicationAttemptId appAttempt2 =
+          createSchedulingRequest(1024, "queue2", "user1", 1);
+      scheduler.handle(new NodeUpdateSchedulerEvent(node));
+      assertEquals(1024, scheduler.getQueueManager().getQueue("queue2").
+          getOpportunisticResourceUsage().getMemorySize());
+      List<Container> allocatedContainers2 =
+          scheduler.getSchedulerApp(appAttempt2).pullNewlyAllocatedContainers();
+      assertTrue(allocatedContainers2.size() == 1);
+      assertEquals("unexpected container execution type",
+          ExecutionType.OPPORTUNISTIC,
+          allocatedContainers2.get(0).getExecutionType());
+
+      // verify that no reservation is made for the second request given
+      // that it's satisfied by an OPPORTUNISTIC container allocation.
+      assertTrue("No reservation should be made because we have satisfied" +
+          " the second request with an OPPORTUNISTIC container allocation",
+          scheduler.getNode(node.getNodeID()).getReservedContainer() == null);
+    } finally {
+      conf.setBoolean(YarnConfiguration.RM_SCHEDULER_OVERSUBSCRIPTION_ENABLED,
+          false);
+      conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
+          memoryAllocationMinimum);
+      conf.setInt(
+          FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB,
+          memoryAllocationIncrement);
+    }
+  }
+
+  /**
+   * Test opportunistic containers can be allocated on a node that is fully
+   * allocated but whose utilization is very low.
+   */
+  @Test
+  public void testAllocateOpportunisticContainersOnFullyAllocatedNode()
+      throws IOException {
+    conf.setBoolean(YarnConfiguration.RM_SCHEDULER_OVERSUBSCRIPTION_ENABLED,
+        true);
+    // disable resource request normalization in fair scheduler
+    int memoryAllocationIncrement = conf.getInt(
+        FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB,
+        FairSchedulerConfiguration.
+            DEFAULT_RM_SCHEDULER_INCREMENT_ALLOCATION_MB);
+    conf.setInt(
+        FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB, 1);
+    int memoryAllocationMinimum = conf.getInt(
+        YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
+        YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB);
+    conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 1);
+
+    try {
+      scheduler.init(conf);
+      scheduler.start();
+      scheduler.reinitialize(conf, resourceManager.getRMContext());
+
+      // Add a node with 4G of memory and 4 vcores and an overallocation
+      // threshold of 0.75f and 0.75f for memory and cpu respectively
+      OverAllocationInfo overAllocationInfo = OverAllocationInfo.newInstance(
+          ResourceThresholds.newInstance(0.75f, 0.75f));
+      MockNodes.MockRMNodeImpl node = MockNodes.newNodeInfo(1,
+          Resources.createResource(4096, 4), overAllocationInfo);
+      scheduler.handle(new NodeAddedSchedulerEvent(node));
+
+      // create a scheduling request that takes up the whole node
+      ApplicationAttemptId appAttempt1 = createSchedulingRequest(
+          4096, "queue1", "user1", 4);
+      scheduler.handle(new NodeUpdateSchedulerEvent(node));
+      assertEquals(4096, scheduler.getQueueManager().getQueue("queue1").
+          getGuaranteedResourceUsage().getMemorySize());
+      List<Container> allocatedContainers1 =
+          scheduler.getSchedulerApp(appAttempt1).pullNewlyAllocatedContainers();
+      assertTrue(allocatedContainers1.size() == 1);
+      assertEquals("unexpected container execution type",
+          ExecutionType.GUARANTEED,
+          allocatedContainers1.get(0).getExecutionType());
+
+      // node utilization is low after the container is launched on the node
+      ContainerStatus containerStatus = ContainerStatus.newInstance(
+          allocatedContainers1.get(0).getId(), ContainerState.RUNNING, "",
+          ContainerExitStatus.SUCCESS);
+      node.updateContainersAndNodeUtilization(
+          new UpdatedContainerInfo(Collections.singletonList(containerStatus),
+              Collections.emptyList()),
+          ResourceUtilization.newInstance(1800, 0, 0.5f));
+
+      // create another scheduling request now that there is no unallocated
+      // resources left on the node, the request should be served with an
+      // allocation of an opportunistic container
+      ApplicationAttemptId appAttempt2 = createSchedulingRequest(
+          1024, "queue2", "user1", 1);
+      scheduler.handle(new NodeUpdateSchedulerEvent(node));
+      assertEquals(1024, scheduler.getQueueManager().getQueue("queue2").
+          getOpportunisticResourceUsage().getMemorySize());
+      List<Container> allocatedContainers2 =
+          scheduler.getSchedulerApp(appAttempt2).pullNewlyAllocatedContainers();
+      assertTrue(allocatedContainers2.size() == 1);
+      assertEquals("unexpected container execution type",
+          ExecutionType.OPPORTUNISTIC,
+          allocatedContainers2.get(0).getExecutionType());
+
+      // verify that no reservation is made for the second request given
+      // that it's satisfied by an OPPORTUNISTIC container allocation.
+      assertTrue("No reservation should be made because we have satisfied" +
+              " the second request with an OPPORTUNISTIC container allocation",
+          scheduler.getNode(node.getNodeID()).getReservedContainer() == null);
+    } finally {
+      conf.setBoolean(YarnConfiguration.RM_SCHEDULER_OVERSUBSCRIPTION_ENABLED,
+          false);
+      conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
+          memoryAllocationMinimum);
+      conf.setInt(
+          FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB,
+          memoryAllocationIncrement);
+    }
+  }
+
+  /**
+   * Test opportunistic containers can be allocated on a node with a low
+   * utilization even though there are GUARANTEED containers allocated.
+   */
+  @Test
+  public void testAllocateOpportunisticContainersWithGuaranteedOnes()
+      throws Exception {
+    conf.setBoolean(YarnConfiguration.RM_SCHEDULER_OVERSUBSCRIPTION_ENABLED,
+        true);
+    // disable resource request normalization in fair scheduler
+    int memoryAllocationIncrement = conf.getInt(
+        FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB,
+        FairSchedulerConfiguration.
+            DEFAULT_RM_SCHEDULER_INCREMENT_ALLOCATION_MB);
+    conf.setInt(
+        FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB, 1);
+    int memoryAllocationMinimum = conf.getInt(
+        YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
+        YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB);
+    conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 1);
+
+    try {
+      scheduler.init(conf);
+      scheduler.start();
+      scheduler.reinitialize(conf, resourceManager.getRMContext());
+
+      // Add a node with 4G of memory and 4 vcores and an overallocation
+      // threshold of 0.75f and 0.75f for memory and cpu respectively
+      OverAllocationInfo overAllocationInfo = OverAllocationInfo.newInstance(
+          ResourceThresholds.newInstance(0.75f, 0.75f));
+      MockNodes.MockRMNodeImpl node = MockNodes.newNodeInfo(1,
+          Resources.createResource(4096, 4), overAllocationInfo);
+      scheduler.handle(new NodeAddedSchedulerEvent(node));
+
+      // create a scheduling request
+      ApplicationAttemptId appAttempt1 =
+          createSchedulingRequest(3200, "queue1", "user1", 3);
+      scheduler.handle(new NodeUpdateSchedulerEvent(node));
+      assertEquals(3200, scheduler.getQueueManager().getQueue("queue1").
+          getGuaranteedResourceUsage().getMemorySize());
+      List<Container> allocatedContainers1 =
+          scheduler.getSchedulerApp(appAttempt1).pullNewlyAllocatedContainers();
+      assertTrue(allocatedContainers1.size() == 1);
+      assertEquals("unexpected container execution type",
+          ExecutionType.GUARANTEED,
+          allocatedContainers1.get(0).getExecutionType());
+
+      // node utilization is low after the container is launched on the node
+      ContainerStatus containerStatus = ContainerStatus.newInstance(
+          allocatedContainers1.get(0).getId(), ContainerState.RUNNING, "",
+          ContainerExitStatus.SUCCESS);
+      node.updateContainersAndNodeUtilization(
+          new UpdatedContainerInfo(Collections.singletonList(containerStatus),
+              Collections.emptyList()),
+          ResourceUtilization.newInstance(512, 0, 0.1f));
+
+      // create two other scheduling requests which in aggregate ask for more
+      // that what's left unallocated on the node.
+      ApplicationAttemptId appAttempt2 =
+          createSchedulingRequest(512, "queue2", "user1", 1);
+      ApplicationAttemptId appAttempt3 =
+          createSchedulingRequest(1024, "queue3", "user1", 1);
+
+      scheduler.handle(new NodeUpdateSchedulerEvent(node));
+      assertEquals(512, scheduler.getQueueManager().getQueue("queue2").
+          getGuaranteedResourceUsage().getMemorySize());
+      List<Container> allocatedContainers2 =
+          scheduler.getSchedulerApp(appAttempt2).pullNewlyAllocatedContainers();
+      assertTrue(allocatedContainers2.size() == 1);
+      assertEquals("unexpected container execution type",
+          ExecutionType.GUARANTEED,
+          allocatedContainers2.get(0).getExecutionType());
+
+      List<Container> allocatedContainers3 =
+          scheduler.getSchedulerApp(appAttempt3).pullNewlyAllocatedContainers();
+      assertTrue(allocatedContainers3.size() == 1);
+      assertEquals("unexpected container execution type",
+          ExecutionType.OPPORTUNISTIC,
+          allocatedContainers3.get(0).getExecutionType());
+      assertEquals(1024, scheduler.getQueueManager().getQueue("queue3").
+          getOpportunisticResourceUsage().getMemorySize());
+
+      // verify that no reservation is made given that the second request should
+      // be satisfied by a GUARANTEED container allocation, the third by an
+      // OPPORTUNISTIC container allocation.
+      assertTrue("No reservation should be made.",
+          scheduler.getNode(node.getNodeID()).getReservedContainer() == null);
+    } finally {
+      conf.setBoolean(YarnConfiguration.RM_SCHEDULER_OVERSUBSCRIPTION_ENABLED,
+          false);
+      conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
+          memoryAllocationMinimum);
+      conf.setInt(
+          FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB,
+          memoryAllocationIncrement);
+    }
+  }
+
   @Test
   @Test
   public void testAclSubmitApplication() throws Exception {
   public void testAclSubmitApplication() throws Exception {
     // Set acl's
     // Set acl's
@@ -3692,7 +4055,7 @@ public class TestFairScheduler extends FairSchedulerTestBase {
         .createAbnormalContainerStatus(container.getContainerId(),
         .createAbnormalContainerStatus(container.getContainerId(),
             SchedulerUtils.COMPLETED_APPLICATION),
             SchedulerUtils.COMPLETED_APPLICATION),
         RMContainerEventType.FINISHED);
         RMContainerEventType.FINISHED);
-    assertEquals(Resources.none(), app1.getResourceUsage());
+    assertEquals(Resources.none(), app1.getGuaranteedResourceUsage());
   }
   }
 
 
   @Test
   @Test
@@ -3792,7 +4155,7 @@ public class TestFairScheduler extends FairSchedulerTestBase {
     assertEquals("Application1's AM should be finished",
     assertEquals("Application1's AM should be finished",
         0, app1.getLiveContainers().size());
         0, app1.getLiveContainers().size());
     assertEquals("Finished application usage should be none",
     assertEquals("Finished application usage should be none",
-        Resources.none(), app1.getResourceUsage());
+        Resources.none(), app1.getGuaranteedResourceUsage());
     assertEquals("Application3's AM should be running",
     assertEquals("Application3's AM should be running",
         1, app3.getLiveContainers().size());
         1, app3.getLiveContainers().size());
     assertEquals("Application3's AM requests 1024 MB memory",
     assertEquals("Application3's AM requests 1024 MB memory",
@@ -3812,7 +4175,7 @@ public class TestFairScheduler extends FairSchedulerTestBase {
     assertEquals("Application4's AM should not be running",
     assertEquals("Application4's AM should not be running",
         0, app4.getLiveContainers().size());
         0, app4.getLiveContainers().size());
     assertEquals("Finished application usage should be none",
     assertEquals("Finished application usage should be none",
-        Resources.none(), app4.getResourceUsage());
+        Resources.none(), app4.getGuaranteedResourceUsage());
     assertEquals("Queue1's AM resource usage should be 2048 MB memory",
     assertEquals("Queue1's AM resource usage should be 2048 MB memory",
         2048, queue1.getAmResourceUsage().getMemorySize());
         2048, queue1.getAmResourceUsage().getMemorySize());
 
 
@@ -3828,7 +4191,7 @@ public class TestFairScheduler extends FairSchedulerTestBase {
     assertEquals("Application5's AM should not be running",
     assertEquals("Application5's AM should not be running",
         0, app5.getLiveContainers().size());
         0, app5.getLiveContainers().size());
     assertEquals("Finished application usage should be none",
     assertEquals("Finished application usage should be none",
-        Resources.none(), app5.getResourceUsage());
+        Resources.none(), app5.getGuaranteedResourceUsage());
     assertEquals("Queue1's AM resource usage should be 2048 MB memory",
     assertEquals("Queue1's AM resource usage should be 2048 MB memory",
         2048, queue1.getAmResourceUsage().getMemorySize());
         2048, queue1.getAmResourceUsage().getMemorySize());
 
 
@@ -3841,7 +4204,7 @@ public class TestFairScheduler extends FairSchedulerTestBase {
     assertEquals("Application5's AM should not be running",
     assertEquals("Application5's AM should not be running",
         0, app5.getLiveContainers().size());
         0, app5.getLiveContainers().size());
     assertEquals("Finished application usage should be none",
     assertEquals("Finished application usage should be none",
-        Resources.none(), app5.getResourceUsage());
+        Resources.none(), app5.getGuaranteedResourceUsage());
     assertEquals("Queue1's AM resource usage should be 2048 MB memory",
     assertEquals("Queue1's AM resource usage should be 2048 MB memory",
         2048, queue1.getAmResourceUsage().getMemorySize());
         2048, queue1.getAmResourceUsage().getMemorySize());
 
 
@@ -3857,11 +4220,11 @@ public class TestFairScheduler extends FairSchedulerTestBase {
     assertEquals("Application2's AM should be finished",
     assertEquals("Application2's AM should be finished",
         0, app2.getLiveContainers().size());
         0, app2.getLiveContainers().size());
     assertEquals("Finished application usage should be none",
     assertEquals("Finished application usage should be none",
-        Resources.none(), app2.getResourceUsage());
+        Resources.none(), app2.getGuaranteedResourceUsage());
     assertEquals("Application3's AM should be finished",
     assertEquals("Application3's AM should be finished",
         0, app3.getLiveContainers().size());
         0, app3.getLiveContainers().size());
     assertEquals("Finished application usage should be none",
     assertEquals("Finished application usage should be none",
-        Resources.none(), app3.getResourceUsage());
+        Resources.none(), app3.getGuaranteedResourceUsage());
     assertEquals("Application5's AM should be running",
     assertEquals("Application5's AM should be running",
         1, app5.getLiveContainers().size());
         1, app5.getLiveContainers().size());
     assertEquals("Application5's AM requests 2048 MB memory",
     assertEquals("Application5's AM requests 2048 MB memory",
@@ -3882,7 +4245,7 @@ public class TestFairScheduler extends FairSchedulerTestBase {
     assertEquals("Application5's AM should have 0 container",
     assertEquals("Application5's AM should have 0 container",
         0, app5.getLiveContainers().size());
         0, app5.getLiveContainers().size());
     assertEquals("Finished application usage should be none",
     assertEquals("Finished application usage should be none",
-        Resources.none(), app5.getResourceUsage());
+        Resources.none(), app5.getGuaranteedResourceUsage());
     assertEquals("Queue1's AM resource usage should be 2048 MB memory",
     assertEquals("Queue1's AM resource usage should be 2048 MB memory",
         2048, queue1.getAmResourceUsage().getMemorySize());
         2048, queue1.getAmResourceUsage().getMemorySize());
     scheduler.update();
     scheduler.update();
@@ -3906,7 +4269,7 @@ public class TestFairScheduler extends FairSchedulerTestBase {
     assertEquals("Application6's AM should not be running",
     assertEquals("Application6's AM should not be running",
         0, app6.getLiveContainers().size());
         0, app6.getLiveContainers().size());
     assertEquals("Finished application usage should be none",
     assertEquals("Finished application usage should be none",
-        Resources.none(), app6.getResourceUsage());
+        Resources.none(), app6.getGuaranteedResourceUsage());
     assertEquals("Application6's AM resource shouldn't be updated",
     assertEquals("Application6's AM resource shouldn't be updated",
         0, app6.getAMResource().getMemorySize());
         0, app6.getAMResource().getMemorySize());
     assertEquals("Queue1's AM resource usage should be 2048 MB memory",
     assertEquals("Queue1's AM resource usage should be 2048 MB memory",
@@ -4621,17 +4984,25 @@ public class TestFairScheduler extends FairSchedulerTestBase {
     FSQueue queue2 = queueMgr.getLeafQueue("parent2.queue2", true);
     FSQueue queue2 = queueMgr.getLeafQueue("parent2.queue2", true);
     FSQueue queue1 = queueMgr.getLeafQueue("parent1.queue1", true);
     FSQueue queue1 = queueMgr.getLeafQueue("parent1.queue1", true);
 
 
-    Assert.assertEquals(parent2.getResourceUsage().getMemorySize(), 0);
-    Assert.assertEquals(queue2.getResourceUsage().getMemorySize(), 0);
-    Assert.assertEquals(parent1.getResourceUsage().getMemorySize(), 1 * GB);
-    Assert.assertEquals(queue1.getResourceUsage().getMemorySize(), 1 * GB);
+    Assert.assertEquals(parent2.getGuaranteedResourceUsage().getMemorySize(),
+        0);
+    Assert.assertEquals(queue2.getGuaranteedResourceUsage().getMemorySize(),
+        0);
+    Assert.assertEquals(parent1.getGuaranteedResourceUsage().getMemorySize(),
+        1 * GB);
+    Assert.assertEquals(queue1.getGuaranteedResourceUsage().getMemorySize(),
+        1 * GB);
 
 
     scheduler.moveApplication(appAttId.getApplicationId(), "parent2.queue2");
     scheduler.moveApplication(appAttId.getApplicationId(), "parent2.queue2");
 
 
-    Assert.assertEquals(parent2.getResourceUsage().getMemorySize(), 1 * GB);
-    Assert.assertEquals(queue2.getResourceUsage().getMemorySize(), 1 * GB);
-    Assert.assertEquals(parent1.getResourceUsage().getMemorySize(), 0);
-    Assert.assertEquals(queue1.getResourceUsage().getMemorySize(), 0);
+    Assert.assertEquals(parent2.getGuaranteedResourceUsage().getMemorySize(),
+        1 * GB);
+    Assert.assertEquals(queue2.getGuaranteedResourceUsage().getMemorySize(),
+        1 * GB);
+    Assert.assertEquals(parent1.getGuaranteedResourceUsage().getMemorySize(),
+        0);
+    Assert.assertEquals(queue1.getGuaranteedResourceUsage().getMemorySize(),
+        0);
   }
   }
     
     
   @Test (expected = YarnException.class)
   @Test (expected = YarnException.class)
@@ -4671,7 +5042,8 @@ public class TestFairScheduler extends FairSchedulerTestBase {
     scheduler.handle(updateEvent);
     scheduler.handle(updateEvent);
     scheduler.handle(updateEvent);
     scheduler.handle(updateEvent);
     
     
-    assertEquals(Resource.newInstance(2048, 2), oldQueue.getResourceUsage());
+    assertEquals(Resource.newInstance(2048, 2),
+        oldQueue.getGuaranteedResourceUsage());
     scheduler.moveApplication(appAttId.getApplicationId(), "queue2");
     scheduler.moveApplication(appAttId.getApplicationId(), "queue2");
   }
   }
   
   
@@ -5117,7 +5489,7 @@ public class TestFairScheduler extends FairSchedulerTestBase {
     scheduler.handle(new NodeUpdateSchedulerEvent(node2));
     scheduler.handle(new NodeUpdateSchedulerEvent(node2));
 
 
     assertEquals(4096, scheduler.getQueueManager().getQueue("queue1").
     assertEquals(4096, scheduler.getQueueManager().getQueue("queue1").
-        getResourceUsage().getMemorySize());
+        getGuaranteedResourceUsage().getMemorySize());
 
 
     //container will be reserved at node1
     //container will be reserved at node1
     RMContainer reservedContainer1 =
     RMContainer reservedContainer1 =
@@ -5137,7 +5509,7 @@ public class TestFairScheduler extends FairSchedulerTestBase {
         app1, RMAppAttemptState.KILLED, false));
         app1, RMAppAttemptState.KILLED, false));
 
 
     assertEquals(0, scheduler.getQueueManager().getQueue("queue1").
     assertEquals(0, scheduler.getQueueManager().getQueue("queue1").
-        getResourceUsage().getMemorySize());
+        getGuaranteedResourceUsage().getMemorySize());
 
 
     // container will be allocated at node2
     // container will be allocated at node2
     scheduler.handle(new NodeUpdateSchedulerEvent(node2));
     scheduler.handle(new NodeUpdateSchedulerEvent(node2));
@@ -5285,10 +5657,12 @@ public class TestFairScheduler extends FairSchedulerTestBase {
 
 
     FSAppAttempt app1 = mock(FSAppAttempt.class);
     FSAppAttempt app1 = mock(FSAppAttempt.class);
     Mockito.when(app1.getDemand()).thenReturn(maxResource);
     Mockito.when(app1.getDemand()).thenReturn(maxResource);
-    Mockito.when(app1.getResourceUsage()).thenReturn(Resources.none());
+    Mockito.when(app1.getGuaranteedResourceUsage()).
+        thenReturn(Resources.none());
     FSAppAttempt app2 = mock(FSAppAttempt.class);
     FSAppAttempt app2 = mock(FSAppAttempt.class);
     Mockito.when(app2.getDemand()).thenReturn(maxResource);
     Mockito.when(app2.getDemand()).thenReturn(maxResource);
-    Mockito.when(app2.getResourceUsage()).thenReturn(Resources.none());
+    Mockito.when(app2.getGuaranteedResourceUsage()).
+        thenReturn(Resources.none());
 
 
     QueueManager queueManager = scheduler.getQueueManager();
     QueueManager queueManager = scheduler.getQueueManager();
     FSParentQueue queue1 = queueManager.getParentQueue("queue1", true);
     FSParentQueue queue1 = queueManager.getParentQueue("queue1", true);
@@ -5344,7 +5718,7 @@ public class TestFairScheduler extends FairSchedulerTestBase {
     child1.setMaxShare(new ConfigurableResource(resource));
     child1.setMaxShare(new ConfigurableResource(resource));
     FSAppAttempt app = mock(FSAppAttempt.class);
     FSAppAttempt app = mock(FSAppAttempt.class);
     Mockito.when(app.getDemand()).thenReturn(resource);
     Mockito.when(app.getDemand()).thenReturn(resource);
-    Mockito.when(app.getResourceUsage()).thenReturn(resource);
+    Mockito.when(app.getGuaranteedResourceUsage()).thenReturn(resource);
     child1.addApp(app, true);
     child1.addApp(app, true);
     child1.updateDemand();
     child1.updateDemand();
 
 
@@ -5380,7 +5754,7 @@ public class TestFairScheduler extends FairSchedulerTestBase {
         + " SteadyFairShare: <memory:0, vCores:0>,"
         + " SteadyFairShare: <memory:0, vCores:0>,"
         + " MaxShare: <memory:4096, vCores:4>,"
         + " MaxShare: <memory:4096, vCores:4>,"
         + " MinShare: <memory:0, vCores:0>,"
         + " MinShare: <memory:0, vCores:0>,"
-        + " ResourceUsage: <memory:4096, vCores:4>,"
+        + " Guaranteed ResourceUsage: <memory:4096, vCores:4>,"
         + " Demand: <memory:4096, vCores:4>,"
         + " Demand: <memory:4096, vCores:4>,"
         + " MaxAMShare: 0.5,"
         + " MaxAMShare: 0.5,"
         + " Runnable: 0}";
         + " Runnable: 0}";

+ 8 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestSchedulingPolicy.java

@@ -243,10 +243,15 @@ public class TestSchedulingPolicy {
       }
       }
 
 
       @Override
       @Override
-      public Resource getResourceUsage() {
+      public Resource getGuaranteedResourceUsage() {
         return usage;
         return usage;
       }
       }
 
 
+      @Override
+      public Resource getOpportunisticResourceUsage() {
+        return Resource.newInstance(0, 0);
+      }
+
       @Override
       @Override
       public Resource getMinShare() {
       public Resource getMinShare() {
         return minShare;
         return minShare;
@@ -278,7 +283,8 @@ public class TestSchedulingPolicy {
       }
       }
 
 
       @Override
       @Override
-      public Resource assignContainer(FSSchedulerNode node) {
+      public Resource assignContainer(FSSchedulerNode node,
+          boolean opportunistic) {
         throw new UnsupportedOperationException();
         throw new UnsupportedOperationException();
       }
       }
 
 

+ 1 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/fairscheduler/TestRMWebServicesFairSchedulerCustomResourceTypes.java

@@ -253,7 +253,7 @@ public class TestRMWebServicesFairSchedulerCustomResourceTypes
       final long value) {
       final long value) {
     try {
     try {
       Method incUsedResourceMethod = queue.getClass().getSuperclass()
       Method incUsedResourceMethod = queue.getClass().getSuperclass()
-          .getDeclaredMethod("incUsedResource", Resource.class);
+          .getDeclaredMethod("incUsedGuaranteedResource", Resource.class);
       incUsedResourceMethod.setAccessible(true);
       incUsedResourceMethod.setAccessible(true);
 
 
       Map<String, Long> customResources =
       Map<String, Long> customResources =