Browse Source

YARN-4511. Common scheduler changes to support scheduler-specific oversubscription implementations.

Haibo Chen 7 years ago
parent
commit
ce4c4a7083
30 changed files with 795 additions and 169 deletions
  1. 6 0
      hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java
  2. 6 0
      hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java
  3. 2 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
  4. 1 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempSchedulerNode.java
  5. 7 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java
  6. 11 2
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
  7. 2 2
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
  8. 3 3
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ClusterNodeTracker.java
  9. 236 87
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java
  10. 2 2
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNodeReport.java
  11. 1 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
  12. 2 2
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java
  13. 1 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
  14. 6 5
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java
  15. 1 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSPreemptionThread.java
  16. 5 4
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java
  17. 6 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java
  18. 21 18
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java
  19. 1 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicyMockFramework.java
  20. 3 3
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyMockFramework.java
  21. 3 3
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAbstractYarnScheduler.java
  22. 393 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerNode.java
  23. 1 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
  24. 5 5
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAsyncScheduling.java
  25. 2 2
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
  26. 7 7
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestNodeLabelContainerAllocation.java
  27. 41 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestContinuousScheduling.java
  28. 9 9
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSSchedulerNode.java
  29. 9 5
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
  30. 2 2
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java

+ 6 - 0
hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java

@@ -41,6 +41,7 @@ import org.apache.hadoop.yarn.api.records.ResourceUtilization;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
 import org.apache.hadoop.yarn.server.api.records.OpportunisticContainersStatus;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.api.records.OverAllocationInfo;
 import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode
@@ -202,6 +203,11 @@ public class NodeInfo {
       return null;
     }
 
+    @Override
+    public OverAllocationInfo getOverAllocationInfo() {
+      return null;
+    }
+
     @Override
     public long getUntrackedTimeStamp() {
       return 0;

+ 6 - 0
hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java

@@ -32,6 +32,7 @@ import org.apache.hadoop.yarn.api.records.ResourceUtilization;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
 import org.apache.hadoop.yarn.server.api.records.OpportunisticContainersStatus;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.api.records.OverAllocationInfo;
 import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode
@@ -189,6 +190,11 @@ public class RMNodeWrapper implements RMNode {
     return node.getNodeUtilization();
   }
 
+  @Override
+  public OverAllocationInfo getOverAllocationInfo() {
+    return node.getOverAllocationInfo();
+  }
+
   @Override
   public long getUntrackedTimeStamp() {
     return 0;

+ 2 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java

@@ -397,7 +397,8 @@ public class ResourceTrackerService extends AbstractService implements
         .getCurrentKey());
 
     RMNode rmNode = new RMNodeImpl(nodeId, rmContext, host, cmPort, httpPort,
-        resolve(host), capability, nodeManagerVersion, physicalResource);
+        resolve(host), capability, nodeManagerVersion, physicalResource,
+        request.getOverAllocationInfo());
 
     RMNode oldNode = this.rmContext.getRMNodes().putIfAbsent(nodeId, rmNode);
     if (oldNode == null) {

+ 1 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempSchedulerNode.java

@@ -51,7 +51,7 @@ public class TempSchedulerNode {
   public static TempSchedulerNode fromSchedulerNode(
       FiCaSchedulerNode schedulerNode) {
     TempSchedulerNode n = new TempSchedulerNode();
-    n.totalResource = Resources.clone(schedulerNode.getTotalResource());
+    n.totalResource = Resources.clone(schedulerNode.getCapacity());
     n.allocatedResource = Resources.clone(schedulerNode.getAllocatedResource());
     n.runningContainers = schedulerNode.getCopiedListOfRunningContainers();
     n.reservedContainer = schedulerNode.getReservedContainer();

+ 7 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java

@@ -35,6 +35,7 @@ import org.apache.hadoop.yarn.api.records.NodeAttribute;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
 import org.apache.hadoop.yarn.server.api.records.OpportunisticContainersStatus;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.api.records.OverAllocationInfo;
 
 /**
  * Node managers information on available resources 
@@ -116,6 +117,12 @@ public interface RMNode {
    */
   public ResourceUtilization getNodeUtilization();
 
+  /**
+   * Get the node overallocation threshold.
+   * @return the overallocation threshold
+   */
+  OverAllocationInfo getOverAllocationInfo();
+
   /**
    * the physical resources in the node.
    * @return the physical resources in the node.

+ 11 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java

@@ -67,6 +67,7 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
 import org.apache.hadoop.yarn.server.api.records.OpportunisticContainersStatus;
 import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
+import org.apache.hadoop.yarn.server.api.records.OverAllocationInfo;
 import org.apache.hadoop.yarn.server.resourcemanager.ClusterMetrics;
 import org.apache.hadoop.yarn.server.resourcemanager.NodesListManagerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.NodesListManagerEventType;
@@ -112,6 +113,7 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
   private final WriteLock writeLock;
 
   private final ConcurrentLinkedQueue<UpdatedContainerInfo> nodeUpdateQueue;
+  private final OverAllocationInfo overallocationInfo;
   private volatile boolean nextHeartBeat = true;
 
   private final NodeId nodeId;
@@ -367,12 +369,13 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
       int cmPort, int httpPort, Node node, Resource capability,
       String nodeManagerVersion) {
     this(nodeId, context, hostName, cmPort, httpPort, node, capability,
-        nodeManagerVersion, null);
+        nodeManagerVersion, null, null);
   }
 
   public RMNodeImpl(NodeId nodeId, RMContext context, String hostName,
       int cmPort, int httpPort, Node node, Resource capability,
-      String nodeManagerVersion, Resource physResource) {
+      String nodeManagerVersion, Resource physResource,
+      OverAllocationInfo overAllocationInfo) {
     this.nodeId = nodeId;
     this.context = context;
     this.hostName = hostName;
@@ -387,6 +390,7 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
     this.nodeManagerVersion = nodeManagerVersion;
     this.timeStamp = 0;
     this.physicalResource = physResource;
+    this.overallocationInfo = overAllocationInfo;
 
     this.latestNodeHeartBeatResponse.setResponseId(0);
 
@@ -536,6 +540,11 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
     }
   }
 
+  @Override
+  public OverAllocationInfo getOverAllocationInfo() {
+    return this.overallocationInfo;
+  }
+
   public void setNodeUtilization(ResourceUtilization nodeUtilization) {
     this.writeLock.lock();
 

+ 2 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java

@@ -357,7 +357,7 @@ public abstract class AbstractYarnScheduler
       }
 
       application.containerLaunchedOnNode(containerId, node.getNodeID());
-      node.containerStarted(containerId);
+      node.containerLaunched(containerId);
     } finally {
       readLock.unlock();
     }
@@ -825,7 +825,7 @@ public abstract class AbstractYarnScheduler
       writeLock.lock();
       SchedulerNode node = getSchedulerNode(nm.getNodeID());
       Resource newResource = resourceOption.getResource();
-      Resource oldResource = node.getTotalResource();
+      Resource oldResource = node.getCapacity();
       if (!oldResource.equals(newResource)) {
         // Notify NodeLabelsManager about this change
         rmContext.getNodeLabelManager().updateNodeResource(nm.getNodeID(),

+ 3 - 3
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ClusterNodeTracker.java

@@ -102,7 +102,7 @@ public class ClusterNodeTracker<N extends SchedulerNode> {
       nodesList.add(node);
 
       // Update cluster capacity
-      Resources.addTo(clusterCapacity, node.getTotalResource());
+      Resources.addTo(clusterCapacity, node.getCapacity());
       staleClusterCapacity = Resources.clone(clusterCapacity);
 
       // Update maximumAllocation
@@ -197,7 +197,7 @@ public class ClusterNodeTracker<N extends SchedulerNode> {
       }
 
       // Update cluster capacity
-      Resources.subtractFrom(clusterCapacity, node.getTotalResource());
+      Resources.subtractFrom(clusterCapacity, node.getCapacity());
       staleClusterCapacity = Resources.clone(clusterCapacity);
 
       // Update maximumAllocation
@@ -259,7 +259,7 @@ public class ClusterNodeTracker<N extends SchedulerNode> {
   }
 
   private void updateMaxResources(SchedulerNode node, boolean add) {
-    Resource totalResource = node.getTotalResource();
+    Resource totalResource = node.getCapacity();
     ResourceInformation[] totalResources;
 
     if (totalResource != null) {

+ 236 - 87
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java

@@ -39,6 +39,7 @@ import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceUtilization;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
@@ -60,24 +61,33 @@ public abstract class SchedulerNode {
 
   private static final Log LOG = LogFactory.getLog(SchedulerNode.class);
 
+  private Resource capacity;
   private Resource unallocatedResource = Resource.newInstance(0, 0);
-  private Resource allocatedResource = Resource.newInstance(0, 0);
-  private Resource totalResource;
+
   private RMContainer reservedContainer;
-  private volatile int numContainers;
   private volatile ResourceUtilization containersUtilization =
       ResourceUtilization.newInstance(0, 0, 0f);
   private volatile ResourceUtilization nodeUtilization =
       ResourceUtilization.newInstance(0, 0, 0f);
 
-  /* set of containers that are allocated containers */
-  private final Map<ContainerId, ContainerInfo> launchedContainers =
-      new HashMap<>();
+  private final Map<ContainerId, ContainerInfo>
+      allocatedContainers = new HashMap<>();
+
+  private volatile int numGuaranteedContainers = 0;
+  private Resource allocatedResourceGuaranteed = Resource.newInstance(0, 0);
+
+  private volatile int numOpportunisticContainers = 0;
+  private Resource allocatedResourceOpportunistic = Resource.newInstance(0, 0);
 
   private final RMNode rmNode;
   private final String nodeName;
   private final RMContext rmContext;
 
+  // The total amount of resources requested by containers that have been
+  // allocated but not yet launched on the node.
+  protected Resource resourceAllocatedPendingLaunch =
+      Resource.newInstance(0, 0);
+
   private volatile Set<String> labels = null;
 
   private volatile Set<NodeAttribute> nodeAttributes = null;
@@ -90,7 +100,7 @@ public abstract class SchedulerNode {
     this.rmNode = node;
     this.rmContext = node.getRMContext();
     this.unallocatedResource = Resources.clone(node.getTotalCapability());
-    this.totalResource = Resources.clone(node.getTotalCapability());
+    this.capacity = Resources.clone(node.getTotalCapability());
     if (usePortForNodeName) {
       nodeName = rmNode.getHostName() + ":" + node.getNodeID().getPort();
     } else {
@@ -113,9 +123,9 @@ public abstract class SchedulerNode {
    * @param resource Total resources on the node.
    */
   public synchronized void updateTotalResource(Resource resource){
-    this.totalResource = resource;
-    this.unallocatedResource = Resources.subtract(totalResource,
-        this.allocatedResource);
+    this.capacity = Resources.clone(resource);
+    this.unallocatedResource = Resources.subtract(capacity,
+        this.allocatedResourceGuaranteed);
   }
 
   /**
@@ -174,17 +184,83 @@ public abstract class SchedulerNode {
   protected synchronized void allocateContainer(RMContainer rmContainer,
       boolean launchedOnNode) {
     Container container = rmContainer.getContainer();
-    if (rmContainer.getExecutionType() == ExecutionType.GUARANTEED) {
-      deductUnallocatedResource(container.getResource());
-      ++numContainers;
+
+    if (container.getExecutionType() == ExecutionType.GUARANTEED) {
+      guaranteedContainerResourceAllocated(rmContainer, launchedOnNode);
+    } else {
+      opportunisticContainerResourceAllocated(rmContainer, launchedOnNode);
+    }
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Assigned container " + container.getId() + " of capacity "
+          + container.getResource() + " and type " +
+          container.getExecutionType() + " on host " + toString());
+    }
+  }
+
+  /**
+   * Handle an allocation of a GUARANTEED container.
+   * @param rmContainer the allocated GUARANTEED container
+   * @param launchedOnNode true if the container has been launched
+   */
+  private void guaranteedContainerResourceAllocated(
+      RMContainer rmContainer, boolean launchedOnNode) {
+    Container container = rmContainer.getContainer();
+
+    if (container.getExecutionType() != ExecutionType.GUARANTEED) {
+      throw new YarnRuntimeException("Inapplicable ExecutionType: " +
+          container.getExecutionType());
+    }
+
+    allocatedContainers.put(container.getId(),
+        new ContainerInfo(rmContainer, launchedOnNode));
+
+    Resource resource = container.getResource();
+    if (containerResourceAllocated(resource, allocatedResourceGuaranteed)) {
+      Resources.subtractFrom(unallocatedResource, resource);
+    }
+
+    numGuaranteedContainers++;
+  }
+
+  /**
+   * Handle an allocation of a OPPORTUNISTIC container.
+   * @param rmContainer the allocated OPPORTUNISTIC container
+   * @param launchedOnNode true if the container has been launched
+   */
+  private void opportunisticContainerResourceAllocated(
+      RMContainer rmContainer, boolean launchedOnNode) {
+    Container container = rmContainer.getContainer();
+
+    if (container.getExecutionType() != ExecutionType.OPPORTUNISTIC) {
+      throw new YarnRuntimeException("Inapplicable ExecutionType: " +
+          container.getExecutionType());
     }
 
-    launchedContainers.put(container.getId(),
+    allocatedContainers.put(rmContainer.getContainerId(),
         new ContainerInfo(rmContainer, launchedOnNode));
+    if (containerResourceAllocated(
+        container.getResource(), allocatedResourceOpportunistic)) {
+      // nothing to do here
+    }
+    numOpportunisticContainers++;
+  }
+
+  private boolean containerResourceAllocated(Resource allocated,
+      Resource aggregatedResources) {
+    if (allocated == null) {
+      LOG.error("Invalid deduction of null resource for "
+          + rmNode.getNodeAddress());
+      return false;
+    }
+    Resources.addTo(resourceAllocatedPendingLaunch, allocated);
+    Resources.addTo(aggregatedResources, allocated);
+    return true;
   }
 
+
   /**
-   * Get unallocated resources on the node.
+   * Get resources that are not allocated to GUARANTEED containers on the node.
    * @return Unallocated resources on the node
    */
   public synchronized Resource getUnallocatedResource() {
@@ -192,42 +268,57 @@ public abstract class SchedulerNode {
   }
 
   /**
-   * Get allocated resources on the node.
-   * @return Allocated resources on the node
+   * Get resources allocated to GUARANTEED containers on the node.
+   * @return Allocated resources to GUARANTEED containers on the node
    */
   public synchronized Resource getAllocatedResource() {
-    return this.allocatedResource;
+    return this.allocatedResourceGuaranteed;
+  }
+
+  /**
+   * Get resources allocated to OPPORTUNISTIC containers on the node.
+   * @return Allocated resources to OPPORTUNISTIC containers on the node
+   */
+  public synchronized Resource getOpportunisticResourceAllocated() {
+    return this.allocatedResourceOpportunistic;
+  }
+
+  @VisibleForTesting
+  public synchronized Resource getResourceAllocatedPendingLaunch() {
+    return this.resourceAllocatedPendingLaunch;
   }
 
   /**
    * Get total resources on the node.
    * @return Total resources on the node.
    */
-  public synchronized Resource getTotalResource() {
-    return this.totalResource;
+  public synchronized Resource getCapacity() {
+    return this.capacity;
   }
 
   /**
-   * Check if a container is launched by this node.
+   * Check if a GUARANTEED container is launched by this node.
    * @return If the container is launched by the node.
    */
-  public synchronized boolean isValidContainer(ContainerId containerId) {
-    if (launchedContainers.containsKey(containerId)) {
-      return true;
-    }
-    return false;
+  @VisibleForTesting
+  public synchronized boolean isValidGuaranteedContainer(
+      ContainerId containerId) {
+    ContainerInfo containerInfo = allocatedContainers.get(containerId);
+    return containerInfo != null && ExecutionType.GUARANTEED ==
+        containerInfo.container.getExecutionType();
   }
 
   /**
-   * Update the resources of the node when releasing a container.
-   * @param container Container to release.
+   * Check if an OPPORTUNISTIC container is launched by this node.
+   * @param containerId id of the container to check
+   * @return If the container is launched by the node.
    */
-  protected synchronized void updateResourceForReleasedContainer(
-      Container container) {
-    if (container.getExecutionType() == ExecutionType.GUARANTEED) {
-      addUnallocatedResource(container.getResource());
-      --numContainers;
-    }
+  @VisibleForTesting
+  public synchronized boolean isValidOpportunisticContainer(
+      ContainerId containerId)  {
+    ContainerInfo containerInfo = allocatedContainers.get(containerId);
+    return containerInfo != null && ExecutionType.OPPORTUNISTIC ==
+        containerInfo.container.getExecutionType();
   }
 
   /**
@@ -237,17 +328,30 @@ public abstract class SchedulerNode {
    */
   public synchronized void releaseContainer(ContainerId containerId,
       boolean releasedByNode) {
-    ContainerInfo info = launchedContainers.get(containerId);
-    if (info == null) {
+    RMContainer rmContainer = getContainer(containerId);
+    if (rmContainer == null) {
+      LOG.warn("Invalid container " + containerId + " is released.");
+      return;
+    }
+
+    if (!allocatedContainers.containsKey(containerId)) {
+      // do not process if the container is never allocated on the node
       return;
     }
-    if (!releasedByNode && info.launchedOnNode) {
-      // wait until node reports container has completed
+
+    if (!releasedByNode &&
+        allocatedContainers.get(containerId).launchedOnNode) {
+      // only process if the container has not been launched on a node
+      // yet or it is released by node.
       return;
     }
 
-    launchedContainers.remove(containerId);
-    Container container = info.container.getContainer();
+    Container container = rmContainer.getContainer();
+    if (container.getExecutionType() == ExecutionType.GUARANTEED) {
+      guaranteedContainerReleased(container);
+    } else {
+      opportunisticContainerReleased(container);
+    }
 
     // We remove allocation tags when a container is actually
     // released on NM. This is to avoid running into situation
@@ -260,14 +364,16 @@ public abstract class SchedulerNode {
               container.getId(), container.getAllocationTags());
     }
 
-    updateResourceForReleasedContainer(container);
-
     if (LOG.isDebugEnabled()) {
       LOG.debug("Released container " + container.getId() + " of capacity "
-              + container.getResource() + " on host " + rmNode.getNodeAddress()
-              + ", which currently has " + numContainers + " containers, "
-              + getAllocatedResource() + " used and " + getUnallocatedResource()
-              + " available" + ", release resources=" + true);
+          + container.getResource() + " on host " + rmNode.getNodeAddress()
+          + ", with " + numGuaranteedContainers
+          + " guaranteed containers taking"
+          + getAllocatedResource() + " and " + numOpportunisticContainers
+          + " opportunistic containers taking "
+          + getOpportunisticResourceAllocated()
+          + " and " + getUnallocatedResource() + " (guaranteed) available"
+          + ", release resources=" + true);
     }
   }
 
@@ -275,42 +381,75 @@ public abstract class SchedulerNode {
    * Inform the node that a container has launched.
    * @param containerId ID of the launched container
    */
-  public synchronized void containerStarted(ContainerId containerId) {
-    ContainerInfo info = launchedContainers.get(containerId);
-    if (info != null) {
+  public synchronized void containerLaunched(ContainerId containerId) {
+    ContainerInfo info = allocatedContainers.get(containerId);
+    if (info != null && !info.launchedOnNode) {
       info.launchedOnNode = true;
+      Resources.subtractFrom(resourceAllocatedPendingLaunch,
+          info.container.getContainer().getResource());
     }
   }
 
   /**
-   * Add unallocated resources to the node. This is used when unallocating a
-   * container.
-   * @param resource Resources to add.
+   * Handle the release of a GUARANTEED container.
+   * @param container Container to release.
    */
-  private synchronized void addUnallocatedResource(Resource resource) {
-    if (resource == null) {
-      LOG.error("Invalid resource addition of null resource for "
-          + rmNode.getNodeAddress());
-      return;
+  protected synchronized void guaranteedContainerReleased(
+      Container container) {
+    if (container.getExecutionType() != ExecutionType.GUARANTEED) {
+      throw new YarnRuntimeException("Inapplicable ExecutionType: " +
+          container.getExecutionType());
+    }
+
+    if (containerResourceReleased(container, allocatedResourceGuaranteed)) {
+      Resources.addTo(unallocatedResource, container.getResource());
     }
-    Resources.addTo(unallocatedResource, resource);
-    Resources.subtractFrom(allocatedResource, resource);
+    // do not update allocated containers until the resources of
+    // the container are released because we need to check if we
+    // need to update resourceAllocatedPendingLaunch in case the
+    // container has not been launched on the node.
+    allocatedContainers.remove(container.getId());
+    numGuaranteedContainers--;
   }
 
   /**
-   * Deduct unallocated resources from the node. This is used when allocating a
-   * container.
-   * @param resource Resources to deduct.
+   * Handle the release of an OPPORTUNISTIC container.
+   * @param container Container to release.
    */
-  @VisibleForTesting
-  public synchronized void deductUnallocatedResource(Resource resource) {
-    if (resource == null) {
-      LOG.error("Invalid deduction of null resource for "
+  private void opportunisticContainerReleased(
+      Container container) {
+    if (container.getExecutionType() != ExecutionType.OPPORTUNISTIC) {
+      throw new YarnRuntimeException("Inapplicable ExecutionType: " +
+          container.getExecutionType());
+    }
+
+    if (containerResourceReleased(container, allocatedResourceOpportunistic)) {
+      // nothing to do here
+    }
+    // do not update allocated containers until the resources of
+    // the container are released because we need to check if we
+    // need to update resourceAllocatedPendingLaunch in case the
+    // container has not been launched on the node.
+    allocatedContainers.remove(container.getId());
+    numOpportunisticContainers--;
+  }
+
+  private boolean containerResourceReleased(Container container,
+      Resource aggregatedResource) {
+    Resource released = container.getResource();
+    if (released == null) {
+      LOG.error("Invalid resource addition of null resource for "
           + rmNode.getNodeAddress());
-      return;
+      return false;
     }
-    Resources.subtractFrom(unallocatedResource, resource);
-    Resources.addTo(allocatedResource, resource);
+    Resources.subtractFrom(aggregatedResource, released);
+
+    if (!allocatedContainers.get(container.getId()).launchedOnNode) {
+      // update resourceAllocatedPendingLaunch if the container is has
+      // not yet been launched on the node
+      Resources.subtractFrom(resourceAllocatedPendingLaunch, released);
+    }
+    return true;
   }
 
   /**
@@ -330,17 +469,28 @@ public abstract class SchedulerNode {
 
   @Override
   public String toString() {
-    return "host: " + rmNode.getNodeAddress() + " #containers="
-        + getNumContainers() + " available=" + getUnallocatedResource()
-        + " used=" + getAllocatedResource();
+    return "host: " + rmNode.getNodeAddress() + " #guaranteed containers=" +
+        getNumGuaranteedContainers() + " #opportunistic containers="  +
+        getNumOpportunisticContainers() + " available=" +
+        getUnallocatedResource() + " used by guaranteed containers=" +
+        allocatedResourceGuaranteed + " used by opportunistic containers=" +
+        allocatedResourceOpportunistic;
+  }
+
+  /**
+   * Get number of active GUARANTEED containers on the node.
+   * @return Number of active OPPORTUNISTIC containers on the node.
+   */
+  public int getNumGuaranteedContainers() {
+    return numGuaranteedContainers;
   }
 
   /**
-   * Get number of active containers on the node.
-   * @return Number of active containers on the node.
+   * Get number of active OPPORTUNISTIC containers on the node.
+   * @return Number of active OPPORTUNISTIC containers on the node.
    */
-  public int getNumContainers() {
-    return numContainers;
+  public int getNumOpportunisticContainers() {
+    return numOpportunisticContainers;
   }
 
   /**
@@ -348,8 +498,8 @@ public abstract class SchedulerNode {
    * @return A copy of containers running on the node.
    */
   public synchronized List<RMContainer> getCopiedListOfRunningContainers() {
-    List<RMContainer> result = new ArrayList<>(launchedContainers.size());
-    for (ContainerInfo info : launchedContainers.values()) {
+    List<RMContainer> result = new ArrayList<>(allocatedContainers.size());
+    for (ContainerInfo info : allocatedContainers.values()) {
       result.add(info.container);
     }
     return result;
@@ -359,12 +509,14 @@ public abstract class SchedulerNode {
    * Get the containers running on the node with AM containers at the end.
    * @return A copy of running containers with AM containers at the end.
    */
-  public synchronized List<RMContainer> getRunningContainersWithAMsAtTheEnd() {
+  public synchronized List<RMContainer>
+      getRunningGuaranteedContainersWithAMsAtTheEnd() {
     LinkedList<RMContainer> result = new LinkedList<>();
-    for (ContainerInfo info : launchedContainers.values()) {
+    for (ContainerInfo info : allocatedContainers.values()) {
       if(info.container.isAMContainer()) {
         result.addLast(info.container);
-      } else {
+      } else if (info.container.getExecutionType() ==
+          ExecutionType.GUARANTEED){
         result.addFirst(info.container);
       }
     }
@@ -377,12 +529,9 @@ public abstract class SchedulerNode {
    * @return The container for the specified container ID
    */
   protected synchronized RMContainer getContainer(ContainerId containerId) {
-    RMContainer container = null;
-    ContainerInfo info = launchedContainers.get(containerId);
-    if (info != null) {
-      container = info.container;
-    }
-    return container;
+    ContainerInfo info = allocatedContainers.get(containerId);
+
+    return info != null ? info.container : null;
   }
 
   /**

+ 2 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNodeReport.java

@@ -31,11 +31,11 @@ public class SchedulerNodeReport {
   private final Resource used;
   private final Resource avail;
   private final int num;
-  
+
   public SchedulerNodeReport(SchedulerNode node) {
     this.used = node.getAllocatedResource();
     this.avail = node.getUnallocatedResource();
-    this.num = node.getNumContainers();
+    this.num = node.getNumGuaranteedContainers();
   }
   
   /**

+ 1 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java

@@ -1986,7 +1986,7 @@ public class CapacityScheduler extends
       // update this node to node label manager
       if (labelManager != null) {
         labelManager.activateNode(nodeManager.getNodeID(),
-            schedulerNode.getTotalResource());
+            schedulerNode.getCapacity());
       }
 
       // recover attributes from store if any.

+ 2 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java

@@ -514,13 +514,13 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
 
     Resource capability = pendingAsk.getPerAllocationResource();
     Resource available = node.getUnallocatedResource();
-    Resource totalResource = node.getTotalResource();
+    Resource totalResource = node.getCapacity();
 
     if (!Resources.lessThanOrEqual(rc, clusterResource,
         capability, totalResource)) {
       LOG.warn("Node : " + node.getNodeID()
           + " does not have sufficient resource for ask : " + pendingAsk
-          + " node total capability : " + node.getTotalResource());
+          + " node total capability : " + node.getCapacity());
       // Skip this locality request
       ActivitiesLogger.APP.recordSkippedAppActivityWithoutAllocation(
           activitiesManager, node, application, priority,

+ 1 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java

@@ -1072,7 +1072,7 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
       diagnosticMessageBldr.append(" ( Partition : ");
       diagnosticMessageBldr.append(node.getLabels());
       diagnosticMessageBldr.append(", Total resource : ");
-      diagnosticMessageBldr.append(node.getTotalResource());
+      diagnosticMessageBldr.append(node.getCapacity());
       diagnosticMessageBldr.append(", Available resource : ");
       diagnosticMessageBldr.append(node.getUnallocatedResource());
       diagnosticMessageBldr.append(" ).");

+ 6 - 5
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java

@@ -144,9 +144,9 @@ public class FiCaSchedulerNode extends SchedulerNode {
   }
 
   @Override
-  protected synchronized void updateResourceForReleasedContainer(
+  protected synchronized void guaranteedContainerReleased(
       Container container) {
-    super.updateResourceForReleasedContainer(container);
+    super.guaranteedContainerReleased(container);
     if (killableContainers.containsKey(container.getId())) {
       Resources.subtractFrom(totalKillableResources, container.getResource());
       killableContainers.remove(container.getId());
@@ -168,9 +168,10 @@ public class FiCaSchedulerNode extends SchedulerNode {
     final Container container = rmContainer.getContainer();
     LOG.info("Assigned container " + container.getId() + " of capacity "
           + container.getResource() + " on host " + getRMNode().getNodeAddress()
-          + ", which has " + getNumContainers() + " containers, "
-          + getAllocatedResource() + " used and " + getUnallocatedResource()
-          + " available after allocation");
+          + ", which has " + getNumGuaranteedContainers() + " guaranteed"
+          + " containers using " + getAllocatedResource() + ", "
+          + getNumOpportunisticContainers() + " opportunistic containers"
+          + " using " + getOpportunisticResourceAllocated());
   }
 
 }

+ 1 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSPreemptionThread.java

@@ -193,7 +193,7 @@ class FSPreemptionThread extends Thread {
 
     // Figure out list of containers to consider
     List<RMContainer> containersToCheck =
-        node.getRunningContainersWithAMsAtTheEnd();
+        node.getRunningGuaranteedContainersWithAMsAtTheEnd();
     containersToCheck.removeAll(node.getContainersForPreemption());
 
     // Initialize potential with unallocated but not reserved resources

+ 5 - 4
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java

@@ -242,11 +242,12 @@ public class FSSchedulerNode extends SchedulerNode {
     super.allocateContainer(rmContainer, launchedOnNode);
     if (LOG.isDebugEnabled()) {
       final Container container = rmContainer.getContainer();
-      LOG.debug("Assigned container " + container.getId() + " of capacity "
+      LOG.info("Assigned container " + container.getId() + " of capacity "
           + container.getResource() + " on host " + getRMNode().getNodeAddress()
-          + ", which has " + getNumContainers() + " containers, "
-          + getAllocatedResource() + " used and " + getUnallocatedResource()
-          + " available after allocation");
+          + ", which has " + getNumGuaranteedContainers() + " guaranteed "
+          + "containers using " + getAllocatedResource() + ", "
+          + getNumOpportunisticContainers() + " opportunistic containers "
+          + "using " + getOpportunisticResourceAllocated());
     }
 
     Resource allocated = rmContainer.getAllocatedResource();

+ 6 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java

@@ -38,6 +38,7 @@ import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
 import org.apache.hadoop.yarn.server.api.records.OpportunisticContainersStatus;
+import org.apache.hadoop.yarn.server.api.records.OverAllocationInfo;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo;
 
@@ -263,6 +264,11 @@ public class MockNodes {
       return this.nodeUtilization;
     }
 
+    @Override
+    public OverAllocationInfo getOverAllocationInfo() {
+      return null;
+    }
+
     public OpportunisticContainersStatus getOpportunisticContainersStatus() {
       return OpportunisticContainersStatus.newInstance();
     }

+ 21 - 18
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java

@@ -232,13 +232,14 @@ public class TestWorkPreservingRMRestart extends ParameterizedSchedulerTestBase
     Resource nmResource =
         Resource.newInstance(nm1.getMemory(), nm1.getvCores());
 
-    assertTrue(schedulerNode1.isValidContainer(amContainer.getContainerId()));
-    assertTrue(schedulerNode1.isValidContainer(runningContainer
-      .getContainerId()));
-    assertFalse(schedulerNode1.isValidContainer(completedContainer
-      .getContainerId()));
+    assertTrue(schedulerNode1.isValidGuaranteedContainer(
+        amContainer.getContainerId()));
+    assertTrue(schedulerNode1.isValidGuaranteedContainer(
+        runningContainer.getContainerId()));
+    assertFalse(schedulerNode1.isValidGuaranteedContainer(
+        completedContainer.getContainerId()));
     // 2 launched containers, 1 completed container
-    assertEquals(2, schedulerNode1.getNumContainers());
+    assertEquals(2, schedulerNode1.getNumGuaranteedContainers());
 
     assertEquals(Resources.subtract(nmResource, usedResources),
       schedulerNode1.getUnallocatedResource());
@@ -389,13 +390,14 @@ public class TestWorkPreservingRMRestart extends ParameterizedSchedulerTestBase
     Resource nmResource =
         Resource.newInstance(nm1.getMemory(), nm1.getvCores());
 
-    assertTrue(schedulerNode1.isValidContainer(amContainer.getContainerId()));
-    assertTrue(
-        schedulerNode1.isValidContainer(runningContainer.getContainerId()));
-    assertFalse(
-        schedulerNode1.isValidContainer(completedContainer.getContainerId()));
+    assertTrue(schedulerNode1.isValidGuaranteedContainer(
+        amContainer.getContainerId()));
+    assertTrue(schedulerNode1.isValidGuaranteedContainer(
+        runningContainer.getContainerId()));
+    assertFalse(schedulerNode1.isValidGuaranteedContainer(
+        completedContainer.getContainerId()));
     // 2 launched containers, 1 completed container
-    assertEquals(2, schedulerNode1.getNumContainers());
+    assertEquals(2, schedulerNode1.getNumGuaranteedContainers());
 
     assertEquals(Resources.subtract(nmResource, usedResources),
         schedulerNode1.getUnallocatedResource());
@@ -1700,13 +1702,14 @@ public class TestWorkPreservingRMRestart extends ParameterizedSchedulerTestBase
     Resource nmResource = Resource.newInstance(nm1.getMemory(),
         nm1.getvCores());
 
-    assertTrue(schedulerNode1.isValidContainer(amContainer.getContainerId()));
-    assertTrue(
-        schedulerNode1.isValidContainer(runningContainer.getContainerId()));
-    assertFalse(
-        schedulerNode1.isValidContainer(completedContainer.getContainerId()));
+    assertTrue(schedulerNode1.isValidGuaranteedContainer(
+        amContainer.getContainerId()));
+    assertTrue(schedulerNode1.isValidGuaranteedContainer(
+        runningContainer.getContainerId()));
+    assertFalse(schedulerNode1.isValidGuaranteedContainer(
+        completedContainer.getContainerId()));
     // 2 launched containers, 1 completed container
-    assertEquals(2, schedulerNode1.getNumContainers());
+    assertEquals(2, schedulerNode1.getNumGuaranteedContainers());
 
     assertEquals(Resources.subtract(nmResource, usedResources),
         schedulerNode1.getUnallocatedResource());

+ 1 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicyMockFramework.java

@@ -519,7 +519,7 @@ public class ProportionalCapacityPreemptionPolicyMockFramework {
           totalRes = parseResourceFromString(resSring);
         }
       }
-      when(sn.getTotalResource()).thenReturn(totalRes);
+      when(sn.getCapacity()).thenReturn(totalRes);
       when(sn.getUnallocatedResource()).thenReturn(Resources.clone(totalRes));
 
       // TODO, add settings of killable resources when necessary

+ 3 - 3
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyMockFramework.java

@@ -237,17 +237,17 @@ public class TestProportionalCapacityPreemptionPolicyMockFramework
     // Check host resources
     Assert.assertEquals(3, this.cs.getAllNodes().size());
     SchedulerNode node1 = cs.getSchedulerNode(NodeId.newInstance("n1", 1));
-    Assert.assertEquals(100, node1.getTotalResource().getMemorySize());
+    Assert.assertEquals(100, node1.getCapacity().getMemorySize());
     Assert.assertEquals(100, node1.getCopiedListOfRunningContainers().size());
     Assert.assertNull(node1.getReservedContainer());
 
     SchedulerNode node2 = cs.getSchedulerNode(NodeId.newInstance("n2", 1));
-    Assert.assertEquals(0, node2.getTotalResource().getMemorySize());
+    Assert.assertEquals(0, node2.getCapacity().getMemorySize());
     Assert.assertEquals(50, node2.getCopiedListOfRunningContainers().size());
     Assert.assertNotNull(node2.getReservedContainer());
 
     SchedulerNode node3 = cs.getSchedulerNode(NodeId.newInstance("n3", 1));
-    Assert.assertEquals(30, node3.getTotalResource().getMemorySize());
+    Assert.assertEquals(30, node3.getCapacity().getMemorySize());
     Assert.assertEquals(100, node3.getCopiedListOfRunningContainers().size());
     Assert.assertNull(node3.getReservedContainer());
   }

+ 3 - 3
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAbstractYarnScheduler.java

@@ -289,12 +289,12 @@ public class TestAbstractYarnScheduler extends ParameterizedSchedulerTestBase {
       SchedulerNode mockNode1 = mock(SchedulerNode.class);
       when(mockNode1.getNodeID()).thenReturn(NodeId.newInstance("foo", 8080));
       when(mockNode1.getUnallocatedResource()).thenReturn(emptyResource);
-      when(mockNode1.getTotalResource()).thenReturn(fullResource1);
+      when(mockNode1.getCapacity()).thenReturn(fullResource1);
 
       SchedulerNode mockNode2 = mock(SchedulerNode.class);
       when(mockNode1.getNodeID()).thenReturn(NodeId.newInstance("bar", 8081));
       when(mockNode2.getUnallocatedResource()).thenReturn(emptyResource);
-      when(mockNode2.getTotalResource()).thenReturn(fullResource2);
+      when(mockNode2.getCapacity()).thenReturn(fullResource2);
 
       verifyMaximumResourceCapability(configuredMaximumResource, scheduler);
 
@@ -482,7 +482,7 @@ public class TestAbstractYarnScheduler extends ParameterizedSchedulerTestBase {
 
       // mock container start
       rm1.getRMContext().getScheduler()
-          .getSchedulerNode(nm1.getNodeId()).containerStarted(cid);
+          .getSchedulerNode(nm1.getNodeId()).containerLaunched(cid);
 
       // verifies the allocation is made with correct number of tags
       Map<String, Long> nodeTags = rm1.getRMContext()

+ 393 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerNode.java

@@ -0,0 +1,393 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
+
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ExecutionType;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.AsyncDispatcher;
+import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter;
+import org.apache.hadoop.yarn.server.resourcemanager.metrics.NoOpSystemMetricPublisher;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
+import org.apache.hadoop.yarn.util.resource.Resources;
+import org.junit.Assert;
+import org.junit.Test;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Unit tests for SchedulerNode.
+ */
+public class TestSchedulerNode {
+  private final Resource nodeCapacity = Resource.newInstance(1024*10, 4);
+
+  @Test
+  public void testAllocateAndReleaseGuaranteedContainer() {
+    SchedulerNode schedulerNode = createSchedulerNode(nodeCapacity);
+    Resource resource = Resource.newInstance(4096, 1);
+    RMContainer container = createRMContainer(0, resource,
+        ExecutionType.GUARANTEED, schedulerNode.getRMNode());
+    ContainerId containerId = container.getContainerId();
+
+    // allocate a container on the node
+    schedulerNode.allocateContainer(container);
+
+    Assert.assertEquals("The container should have been allocated",
+        resource, schedulerNode.getAllocatedResource());
+    Assert.assertEquals("Incorrect remaining resource accounted.",
+        Resources.subtract(nodeCapacity, resource),
+        schedulerNode.getUnallocatedResource());
+    Assert.assertEquals("The container should have been allocated" +
+        " but not launched", resource,
+        schedulerNode.getResourceAllocatedPendingLaunch());
+    Assert.assertEquals("The container should have been allocated",
+        1, schedulerNode.getNumGuaranteedContainers());
+    Assert.assertTrue(
+        schedulerNode.isValidGuaranteedContainer(containerId));
+
+    // launch the container on the node
+    schedulerNode.containerLaunched(containerId);
+
+    Assert.assertEquals("The container should have been launched",
+        Resources.none(), schedulerNode.getResourceAllocatedPendingLaunch());
+
+    // release the container
+    schedulerNode.releaseContainer(containerId, true);
+    Assert.assertEquals("The container should have been released",
+        0, schedulerNode.getNumGuaranteedContainers());
+    Assert.assertEquals("The container should have been released",
+        Resources.none(), schedulerNode.getAllocatedResource());
+  }
+
+  @Test
+  public void testAllocateAndReleaseOpportunisticContainer() {
+    SchedulerNode schedulerNode = createSchedulerNode(nodeCapacity);
+    Resource resource = Resource.newInstance(4096, 1);
+    RMContainer container = createRMContainer(0, resource,
+        ExecutionType.OPPORTUNISTIC, schedulerNode.getRMNode());
+    ContainerId containerId = container.getContainerId();
+
+    // allocate a container on the node
+    schedulerNode.allocateContainer(container);
+
+    Assert.assertEquals("The container should have been allocated",
+        resource, schedulerNode.getOpportunisticResourceAllocated());
+    Assert.assertEquals("Incorrect remaining resource accounted.",
+        nodeCapacity, schedulerNode.getUnallocatedResource());
+    Assert.assertEquals("The container should have been allocated" +
+        " but not launched", resource,
+        schedulerNode.getResourceAllocatedPendingLaunch());
+    Assert.assertEquals("The container should have been allocated",
+        1, schedulerNode.getNumOpportunisticContainers());
+    Assert.assertTrue(
+        schedulerNode.isValidOpportunisticContainer(containerId));
+
+    // launch the container on the node
+    schedulerNode.containerLaunched(containerId);
+
+    Assert.assertEquals("The container should have been launched",
+        Resources.none(), schedulerNode.getResourceAllocatedPendingLaunch());
+
+    // release the container
+    schedulerNode.releaseContainer(containerId, true);
+    Assert.assertEquals("The container should have been released",
+        0, schedulerNode.getNumOpportunisticContainers());
+    Assert.assertEquals("The container should have been released",
+        Resources.none(), schedulerNode.getOpportunisticResourceAllocated());
+    Assert.assertFalse("The container should have been released",
+        schedulerNode.isValidOpportunisticContainer(containerId));
+  }
+
+  @Test
+  public void testAllocateAndReleaseContainers() {
+    SchedulerNode schedulerNode = createSchedulerNode(nodeCapacity);
+
+    Resource guaranteedResource = Resource.newInstance(4096, 1);
+    RMContainer guaranteedContainer =
+        createRMContainer(0, guaranteedResource,
+            ExecutionType.GUARANTEED, schedulerNode.getRMNode());
+    ContainerId guaranteedContainerId = guaranteedContainer.getContainerId();
+
+    // allocate a guaranteed container on the node
+    schedulerNode.allocateContainer(guaranteedContainer);
+
+    Assert.assertEquals("The guaranteed container should have been allocated",
+        guaranteedResource, schedulerNode.getAllocatedResource());
+    Assert.assertEquals("Incorrect remaining resource accounted.",
+        Resources.subtract(nodeCapacity, guaranteedResource),
+        schedulerNode.getUnallocatedResource());
+    Assert.assertEquals("The guaranteed container should have been allocated" +
+            " but not launched", guaranteedResource,
+        schedulerNode.getResourceAllocatedPendingLaunch());
+    Assert.assertEquals("The container should have been allocated",
+        1, schedulerNode.getNumGuaranteedContainers());
+    Assert.assertTrue(
+        schedulerNode.isValidGuaranteedContainer(guaranteedContainerId));
+
+    Resource opportunisticResource = Resource.newInstance(8192, 4);
+    RMContainer opportunisticContainer =
+        createRMContainer(1, opportunisticResource,
+            ExecutionType.OPPORTUNISTIC, schedulerNode.getRMNode());
+    ContainerId opportunisticContainerId =
+        opportunisticContainer.getContainerId();
+
+    // allocate an opportunistic container on the node
+    schedulerNode.allocateContainer(opportunisticContainer);
+
+    Assert.assertEquals("The opportunistic container should have been" +
+        " allocated", opportunisticResource,
+        schedulerNode.getOpportunisticResourceAllocated());
+    Assert.assertEquals("Incorrect remaining resource accounted.",
+        Resources.subtract(nodeCapacity, guaranteedResource),
+        schedulerNode.getUnallocatedResource());
+    Assert.assertEquals("The opportunistic container should also have been" +
+        " allocated but not launched",
+        Resources.add(guaranteedResource, opportunisticResource),
+        schedulerNode.getResourceAllocatedPendingLaunch());
+    Assert.assertEquals("The container should have been allocated",
+        1, schedulerNode.getNumOpportunisticContainers());
+    Assert.assertTrue(
+        schedulerNode.isValidOpportunisticContainer(opportunisticContainerId));
+
+    // launch both containers on the node
+    schedulerNode.containerLaunched(guaranteedContainerId);
+    schedulerNode.containerLaunched(opportunisticContainerId);
+
+    Assert.assertEquals("Both containers should have been launched",
+        Resources.none(), schedulerNode.getResourceAllocatedPendingLaunch());
+
+    // release both containers
+    schedulerNode.releaseContainer(guaranteedContainerId, true);
+    schedulerNode.releaseContainer(opportunisticContainerId, true);
+
+    Assert.assertEquals("The guaranteed container should have been released",
+        0, schedulerNode.getNumGuaranteedContainers());
+    Assert.assertEquals("The opportunistic container should have been released",
+        0, schedulerNode.getNumOpportunisticContainers());
+    Assert.assertEquals("The guaranteed container should have been released",
+        Resources.none(), schedulerNode.getAllocatedResource());
+    Assert.assertEquals("The opportunistic container should have been released",
+        Resources.none(), schedulerNode.getOpportunisticResourceAllocated());
+    Assert.assertFalse("The guaranteed container should have been released",
+        schedulerNode.isValidGuaranteedContainer(guaranteedContainerId));
+    Assert.assertFalse("The opportunistic container should have been released",
+        schedulerNode.isValidOpportunisticContainer(opportunisticContainerId));
+  }
+
+  @Test
+  public void testReleaseLaunchedContainerNotAsNode() {
+    SchedulerNode schedulerNode = createSchedulerNode(nodeCapacity);
+    Resource resource = Resource.newInstance(4096, 1);
+    RMContainer container = createRMContainer(0, resource,
+        ExecutionType.GUARANTEED, schedulerNode.getRMNode());
+    ContainerId containerId = container.getContainerId();
+
+    // allocate a container on the node
+    schedulerNode.allocateContainer(container);
+
+    Assert.assertEquals("The container should have been allocated",
+        resource, schedulerNode.getAllocatedResource());
+    Assert.assertEquals("Incorrect remaining resource accounted.",
+        Resources.subtract(nodeCapacity, resource),
+        schedulerNode.getUnallocatedResource());
+    Assert.assertEquals("The container should have been allocated" +
+        " but not launched", resource,
+        schedulerNode.getResourceAllocatedPendingLaunch());
+    Assert.assertEquals("The container should have been allocated",
+        1, schedulerNode.getNumGuaranteedContainers());
+    Assert.assertTrue(
+        schedulerNode.isValidGuaranteedContainer(containerId));
+
+    // launch the container on the node
+    schedulerNode.containerLaunched(containerId);
+
+    Assert.assertEquals("The container should have been launched",
+        Resources.none(), schedulerNode.getResourceAllocatedPendingLaunch());
+
+    // release the container
+    schedulerNode.releaseContainer(containerId, false);
+    Assert.assertEquals("The container should not have been released",
+        1, schedulerNode.getNumGuaranteedContainers());
+    Assert.assertEquals("The container should not have been released",
+        resource, schedulerNode.getAllocatedResource());
+    Assert.assertTrue("The container should not have been released",
+        schedulerNode.isValidGuaranteedContainer(containerId));
+  }
+
+  @Test
+  public void testReleaseUnlaunchedContainerAsNode() {
+    SchedulerNode schedulerNode = createSchedulerNode(nodeCapacity);
+    Resource resource = Resource.newInstance(4096, 1);
+    RMContainer container = createRMContainer(0, resource,
+        ExecutionType.GUARANTEED, schedulerNode.getRMNode());
+    ContainerId containerId = container.getContainerId();
+
+    // allocate a container on the node
+    schedulerNode.allocateContainer(container);
+
+    Assert.assertEquals("The container should have been allocated",
+        resource, schedulerNode.getAllocatedResource());
+    Assert.assertEquals("Incorrect remaining resource accounted.",
+        Resources.subtract(nodeCapacity, resource),
+        schedulerNode.getUnallocatedResource());
+    Assert.assertEquals("The container should have been allocated" +
+        " but not launched",
+        resource, schedulerNode.getResourceAllocatedPendingLaunch());
+    Assert.assertEquals("The container should have been allocated",
+        1, schedulerNode.getNumGuaranteedContainers());
+    Assert.assertTrue(
+        schedulerNode.isValidGuaranteedContainer(containerId));
+
+    // make sure the container is not launched yet
+    Assert.assertEquals("The container should not be launched already",
+        resource, schedulerNode.getResourceAllocatedPendingLaunch());
+
+    // release the container
+    schedulerNode.releaseContainer(containerId, true);
+    Assert.assertEquals("The container should have been released",
+        0, schedulerNode.getNumGuaranteedContainers());
+    Assert.assertEquals("The container should have been released",
+        Resources.none(), schedulerNode.getAllocatedResource());
+    Assert.assertFalse("The container should have been released",
+        schedulerNode.isValidGuaranteedContainer(containerId));
+    Assert.assertEquals("The container should have been released",
+        Resources.none(), schedulerNode.getResourceAllocatedPendingLaunch());
+  }
+
+  @Test
+  public void testReleaseUnlaunchedContainerNotAsNode() {
+    SchedulerNode schedulerNode = createSchedulerNode(nodeCapacity);
+    Resource resource = Resource.newInstance(4096, 1);
+    RMContainer container = createRMContainer(0, resource,
+        ExecutionType.GUARANTEED, schedulerNode.getRMNode());
+    ContainerId containerId = container.getContainerId();
+
+    // allocate a container on the node
+    schedulerNode.allocateContainer(container);
+
+    Assert.assertEquals("The container should have been allocated",
+        resource, schedulerNode.getAllocatedResource());
+    Assert.assertEquals("Incorrect remaining resource accounted.",
+        Resources.subtract(nodeCapacity, resource),
+        schedulerNode.getUnallocatedResource());
+    Assert.assertEquals("The container should have been allocated" +
+        " but not launched", resource,
+        schedulerNode.getResourceAllocatedPendingLaunch());
+    Assert.assertEquals("The container should have been allocated",
+        1, schedulerNode.getNumGuaranteedContainers());
+    Assert.assertTrue(
+        schedulerNode.isValidGuaranteedContainer(containerId));
+
+    // make sure the container is not launched yet
+    Assert.assertEquals("The container should not have been launched",
+        resource, schedulerNode.getResourceAllocatedPendingLaunch());
+
+    // release the container
+    schedulerNode.releaseContainer(containerId, false);
+    Assert.assertEquals("The container should have been released",
+        0, schedulerNode.getNumGuaranteedContainers());
+    Assert.assertEquals("The container should have been released",
+        Resources.none(), schedulerNode.getAllocatedResource());
+    Assert.assertFalse("The container should have been released",
+        schedulerNode.isValidGuaranteedContainer(containerId));
+    Assert.assertEquals("The container should have been released",
+        Resources.none(), schedulerNode.getResourceAllocatedPendingLaunch());
+  }
+
+  private SchedulerNode createSchedulerNode(Resource capacity) {
+    NodeId nodeId = NodeId.newInstance("localhost", 0);
+
+    RMNode rmNode = mock(RMNode.class);
+    when(rmNode.getNodeID()).thenReturn(nodeId);
+    when(rmNode.getHostName()).thenReturn(nodeId.getHost());
+    when(rmNode.getTotalCapability()).thenReturn(capacity);
+    when(rmNode.getRackName()).thenReturn("/default");
+    when(rmNode.getHttpAddress()).thenReturn(nodeId.getHost());
+    when(rmNode.getNodeAddress()).thenReturn(nodeId.getHost());
+
+    return new SchedulerNodeForTest(rmNode);
+  }
+
+  private static RMContainerImpl createRMContainer(long containerId,
+      Resource resource, ExecutionType executionType, RMNode node) {
+    Container container =
+        createContainer(containerId, resource, executionType, node);
+
+    Dispatcher dispatcher = new AsyncDispatcher();
+    RMContext rmContext = mock(RMContext.class);
+    when(rmContext.getDispatcher()).thenReturn(dispatcher);
+    when(rmContext.getSystemMetricsPublisher()).
+        thenReturn(new NoOpSystemMetricPublisher());
+    when(rmContext.getYarnConfiguration()).
+        thenReturn(new YarnConfiguration());
+    when(rmContext.getContainerAllocationExpirer()).
+        thenReturn(new ContainerAllocationExpirer(dispatcher));
+    when(rmContext.getRMApplicationHistoryWriter()).
+        thenReturn(new RMApplicationHistoryWriter());
+
+    return new RMContainerImpl(container, null,
+        container.getId().getApplicationAttemptId(),
+        node.getNodeID(), "test", rmContext);
+  }
+
+  private static Container createContainer(long containerId, Resource resource,
+      ExecutionType executionType, RMNode node) {
+    ApplicationAttemptId appAttemptId = ApplicationAttemptId.
+        newInstance(ApplicationId.newInstance(0, 0), 0);
+    ContainerId cId =
+        ContainerId.newContainerId(appAttemptId, containerId);
+    Container container = Container.newInstance(
+        cId, node.getNodeID(), node.getNodeAddress(), resource,
+        Priority.newInstance(0), null, executionType);
+    return container;
+  }
+
+
+  /**
+   * A test implementation of SchedulerNode for the purpose of testing
+   * SchedulerNode only. Resource reservation is scheduler-dependent,
+   * and therefore not covered here.
+   */
+  private static final class SchedulerNodeForTest extends SchedulerNode {
+    SchedulerNodeForTest(RMNode node) {
+      super(node, false);
+    }
+
+    @Override
+    public void reserveResource(SchedulerApplicationAttempt attempt,
+        SchedulerRequestKey schedulerKey, RMContainer container) {
+    }
+
+    @Override
+    public void unreserveResource(SchedulerApplicationAttempt attempt) {
+    }
+  }
+}

+ 1 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java

@@ -4131,7 +4131,7 @@ public class TestCapacityScheduler extends CapacitySchedulerTestBase {
     // Check total resource of scheduler node is also changed to 1 GB 1 core
     Resource totalResource =
         resourceManager.getResourceScheduler()
-            .getSchedulerNode(nm_0.getNodeId()).getTotalResource();
+            .getSchedulerNode(nm_0.getNodeId()).getCapacity();
     Assert.assertEquals("Total Resource Memory Size should be 1GB", 1 * GB,
         totalResource.getMemorySize());
     Assert.assertEquals("Total Resource Virtual Cores should be 1", 1,

+ 5 - 5
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAsyncScheduling.java

@@ -229,8 +229,8 @@ public class TestCapacitySchedulerAsyncScheduling {
 
     // nm1 runs 1 container(app1-container_01/AM)
     // nm2 runs 1 container(app1-container_02)
-    Assert.assertEquals(1, sn1.getNumContainers());
-    Assert.assertEquals(1, sn2.getNumContainers());
+    Assert.assertEquals(1, sn1.getNumGuaranteedContainers());
+    Assert.assertEquals(1, sn2.getNumGuaranteedContainers());
 
     // kill app attempt1
     scheduler.handle(
@@ -325,8 +325,8 @@ public class TestCapacitySchedulerAsyncScheduling {
     // nm1 runs 3 containers(app1-container_01/AM, app1-container_02,
     //                       app2-container_01/AM)
     // nm2 runs 1 container(app1-container_03)
-    Assert.assertEquals(3, sn1.getNumContainers());
-    Assert.assertEquals(1, sn2.getNumContainers());
+    Assert.assertEquals(3, sn1.getNumGuaranteedContainers());
+    Assert.assertEquals(1, sn2.getNumGuaranteedContainers());
 
     // reserve 1 container(app1-container_04) for app1 on nm1
     ResourceRequest rr2 = ResourceRequest
@@ -639,7 +639,7 @@ public class TestCapacitySchedulerAsyncScheduling {
     // nm1 runs 2 container(container_01/AM, container_02)
     allocateAndLaunchContainers(am, nm1, rm, 1,
         Resources.createResource(6 * GB), 0, 2);
-    Assert.assertEquals(2, sn1.getNumContainers());
+    Assert.assertEquals(2, sn1.getNumGuaranteedContainers());
     Assert.assertEquals(1 * GB, sn1.getUnallocatedResource().getMemorySize());
 
     // app asks 5 * 2G container

+ 2 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java

@@ -385,7 +385,7 @@ public class TestLeafQueue {
         new ResourceLimits(clusterResource),
         SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), a, nodes, apps);
     assertEquals(
-        (int)(node_0.getTotalResource().getMemorySize() * a.getCapacity()) - (1*GB),
+        (int)(node_0.getCapacity().getMemorySize() * a.getCapacity()) - (1*GB),
         a.getMetrics().getAvailableMB());
   }
 
@@ -684,7 +684,7 @@ public class TestLeafQueue {
     assertEquals(0*GB, app_1.getCurrentConsumption().getMemorySize());
     assertEquals(0*GB, a.getMetrics().getReservedMB());
     assertEquals(0*GB, a.getMetrics().getAllocatedMB());
-    assertEquals((int)(a.getCapacity() * node_0.getTotalResource().getMemorySize()),
+    assertEquals((int)(a.getCapacity() * node_0.getCapacity().getMemorySize()),
         a.getMetrics().getAvailableMB());
   }
   @Test

+ 7 - 7
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestNodeLabelContainerAllocation.java

@@ -563,7 +563,7 @@ public class TestNodeLabelContainerAllocation {
       int numContainers) {
     CapacityScheduler cs = (CapacityScheduler) rm.getRMContext().getScheduler();
     SchedulerNode node = cs.getSchedulerNode(nodeId);
-    Assert.assertEquals(numContainers, node.getNumContainers());
+    Assert.assertEquals(numContainers, node.getNumGuaranteedContainers());
   }
 
   /**
@@ -1065,7 +1065,7 @@ public class TestNodeLabelContainerAllocation {
     for (int i = 0; i < 50; i++) {
       cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
       cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
-      if (schedulerNode1.getNumContainers() == 0) {
+      if (schedulerNode1.getNumGuaranteedContainers() == 0) {
         cycleWaited++;
       }
     }
@@ -1131,7 +1131,7 @@ public class TestNodeLabelContainerAllocation {
             CSAMContainerLaunchDiagnosticsConstants.LAST_NODE_PROCESSED_MSG
                 + nodeIdStr + " ( Partition : [x]"));
     Assert.assertEquals(0, cs.getSchedulerNode(nm1.getNodeId())
-        .getNumContainers());
+        .getNumGuaranteedContainers());
     
     rm1.close();
   }
@@ -1215,7 +1215,7 @@ public class TestNodeLabelContainerAllocation {
     }
     
     // app1 gets all resource in partition=x
-    Assert.assertEquals(10, schedulerNode1.getNumContainers());
+    Assert.assertEquals(10, schedulerNode1.getNumGuaranteedContainers());
 
     // check non-exclusive containers of LeafQueue is correctly updated
     LeafQueue leafQueue = (LeafQueue) cs.getQueue("a");
@@ -1943,7 +1943,7 @@ public class TestNodeLabelContainerAllocation {
     }
 
     // app1 gets all resource in partition=x
-    Assert.assertEquals(5, schedulerNode1.getNumContainers());
+    Assert.assertEquals(5, schedulerNode1.getNumGuaranteedContainers());
 
     SchedulerNodeReport reportNm1 = rm1.getResourceScheduler()
         .getNodeReport(nm1.getNodeId());
@@ -2043,7 +2043,7 @@ public class TestNodeLabelContainerAllocation {
     }
 
     // app1 gets all resource in partition=x (non-exclusive)
-    Assert.assertEquals(3, schedulerNode1.getNumContainers());
+    Assert.assertEquals(3, schedulerNode1.getNumGuaranteedContainers());
 
     SchedulerNodeReport reportNm1 = rm1.getResourceScheduler()
         .getNodeReport(nm1.getNodeId());
@@ -2074,7 +2074,7 @@ public class TestNodeLabelContainerAllocation {
     cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
 
     // app1 gets all resource in default partition
-    Assert.assertEquals(2, schedulerNode2.getNumContainers());
+    Assert.assertEquals(2, schedulerNode2.getNumGuaranteedContainers());
 
     // 3GB is used from label x quota. 2GB used from default label.
     // So total 2.5 GB is remaining.

+ 41 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestContinuousScheduling.java

@@ -20,16 +20,26 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ExecutionType;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.AsyncDispatcher;
+import org.apache.hadoop.yarn.event.Dispatcher;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
 import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter;
+import org.apache.hadoop.yarn.server.resourcemanager.metrics.NoOpSystemMetricPublisher;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ClusterNodeTracker;
 import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
@@ -48,7 +58,9 @@ import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.mockito.Matchers.isA;
 import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
 
 import org.junit.Before;
 import org.junit.Test;
@@ -324,7 +336,9 @@ public class TestContinuousScheduling extends FairSchedulerTestBase {
           for (FSSchedulerNode node : clusterNodeTracker.getAllNodes()) {
             int i = ThreadLocalRandom.current().nextInt(-30, 30);
             synchronized (scheduler) {
-              node.deductUnallocatedResource(Resource.newInstance(i * 1024, i));
+              RMContainer container = createRMContainer(
+                  node.getRMNode(), Resource.newInstance(i * 1024, 1));
+              node.allocateContainer(container);
             }
           }
         }
@@ -338,6 +352,32 @@ public class TestContinuousScheduling extends FairSchedulerTestBase {
     }
   }
 
+  private static RMContainer createRMContainer(
+      RMNode node, Resource resource) {
+    ApplicationAttemptId appAttemptId = ApplicationAttemptId.
+        newInstance(ApplicationId.newInstance(0, 0), 0);
+    ContainerId cId =
+        ContainerId.newContainerId(appAttemptId, 0);
+    Container container = Container.newInstance(
+        cId, node.getNodeID(), node.getNodeAddress(), resource,
+        Priority.newInstance(0), null, ExecutionType.GUARANTEED);
+
+    Dispatcher dispatcher = new AsyncDispatcher();
+    RMContext rmContext = mock(RMContext.class);
+    when(rmContext.getDispatcher()).thenReturn(dispatcher);
+    when(rmContext.getSystemMetricsPublisher()).
+        thenReturn(new NoOpSystemMetricPublisher());
+    when(rmContext.getYarnConfiguration()).
+        thenReturn(new YarnConfiguration());
+    when(rmContext.getContainerAllocationExpirer()).
+        thenReturn(new ContainerAllocationExpirer(dispatcher));
+    when(rmContext.getRMApplicationHistoryWriter()).
+        thenReturn(new RMApplicationHistoryWriter());
+    return new RMContainerImpl(container, null,
+        container.getId().getApplicationAttemptId(),
+        node.getNodeID(), "test", rmContext);
+  }
+
   @Test
   public void testFairSchedulerContinuousSchedulingInitTime() throws Exception {
     scheduler.start();

+ 9 - 9
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSSchedulerNode.java

@@ -87,7 +87,7 @@ public class TestFSSchedulerNode {
     while (!Resources.isNone(schedulerNode.getUnallocatedResource())) {
       createDefaultContainer();
       schedulerNode.allocateContainer(containers.get(containers.size() - 1));
-      schedulerNode.containerStarted(containers.get(containers.size() - 1).
+      schedulerNode.containerLaunched(containers.get(containers.size() - 1).
           getContainerId());
     }
   }
@@ -183,9 +183,9 @@ public class TestFSSchedulerNode {
     assertEquals("Nothing should have been allocated, yet",
         Resources.none(), schedulerNode.getAllocatedResource());
     schedulerNode.allocateContainer(containers.get(0));
-    schedulerNode.containerStarted(containers.get(0).getContainerId());
+    schedulerNode.containerLaunched(containers.get(0).getContainerId());
     schedulerNode.allocateContainer(containers.get(1));
-    schedulerNode.containerStarted(containers.get(1).getContainerId());
+    schedulerNode.containerLaunched(containers.get(1).getContainerId());
     schedulerNode.allocateContainer(containers.get(2));
     assertEquals("Container should be allocated",
         Resources.multiply(containers.get(0).getContainer().getResource(), 3.0),
@@ -225,7 +225,7 @@ public class TestFSSchedulerNode {
     schedulerNode.releaseContainer(containers.get(0).getContainerId(), true);
     allocateContainers(schedulerNode);
     assertEquals("Container should be allocated",
-        schedulerNode.getTotalResource(),
+        schedulerNode.getCapacity(),
         schedulerNode.getAllocatedResource());
 
     // Release all remaining containers
@@ -266,7 +266,7 @@ public class TestFSSchedulerNode {
     schedulerNode.releaseContainer(containers.get(0).getContainerId(), true);
     allocateContainers(schedulerNode);
     assertEquals("Container should be allocated",
-        schedulerNode.getTotalResource(),
+        schedulerNode.getCapacity(),
         schedulerNode.getAllocatedResource());
 
     // Release all remaining containers
@@ -312,7 +312,7 @@ public class TestFSSchedulerNode {
 
     allocateContainers(schedulerNode);
     assertEquals("Container should be allocated",
-        schedulerNode.getTotalResource(),
+        schedulerNode.getCapacity(),
         schedulerNode.getAllocatedResource());
 
     // Release all containers
@@ -360,7 +360,7 @@ public class TestFSSchedulerNode {
     allocateContainers(schedulerNode);
 
     assertEquals("Container should be allocated",
-        schedulerNode.getTotalResource(),
+        schedulerNode.getCapacity(),
         schedulerNode.getAllocatedResource());
 
     // Release all containers
@@ -399,7 +399,7 @@ public class TestFSSchedulerNode {
     when(starvingApp.isStopped()).thenReturn(true);
     allocateContainers(schedulerNode);
     assertNotEquals("Container should be allocated",
-        schedulerNode.getTotalResource(),
+        schedulerNode.getCapacity(),
         schedulerNode.getAllocatedResource());
 
     // Release all containers
@@ -437,7 +437,7 @@ public class TestFSSchedulerNode {
     // Container partially reassigned
     allocateContainers(schedulerNode);
     assertEquals("Container should be allocated",
-        Resources.subtract(schedulerNode.getTotalResource(),
+        Resources.subtract(schedulerNode.getCapacity(),
             Resource.newInstance(512, 0)),
         schedulerNode.getAllocatedResource());
 

+ 9 - 5
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java

@@ -3299,12 +3299,16 @@ public class TestFairScheduler extends FairSchedulerTestBase {
     scheduler.handle(node2UpdateEvent);
     if (invalid) {
       assertEquals(0, app.getLiveContainers().size());
-      assertEquals(0, scheduler.getNode(node2.getNodeID()).getNumContainers());
-      assertEquals(0, scheduler.getNode(node1.getNodeID()).getNumContainers());
+      assertEquals(0,
+          scheduler.getNode(node2.getNodeID()).getNumGuaranteedContainers());
+      assertEquals(0,
+          scheduler.getNode(node1.getNodeID()).getNumGuaranteedContainers());
     } else {
       assertEquals(1, app.getLiveContainers().size());
-      assertEquals(1, scheduler.getNode(node2.getNodeID()).getNumContainers());
-      assertEquals(0, scheduler.getNode(node1.getNodeID()).getNumContainers());
+      assertEquals(1,
+          scheduler.getNode(node2.getNodeID()).getNumGuaranteedContainers());
+      assertEquals(0,
+          scheduler.getNode(node1.getNodeID()).getNumGuaranteedContainers());
     }
   }
 
@@ -5051,7 +5055,7 @@ public class TestFairScheduler extends FairSchedulerTestBase {
     // Check total resource of scheduler node is also changed to 0 GB 0 core
     Resource totalResource =
         resourceManager.getResourceScheduler()
-            .getSchedulerNode(nm_0.getNodeId()).getTotalResource();
+            .getSchedulerNode(nm_0.getNodeId()).getCapacity();
     Assert.assertEquals(totalResource.getMemorySize(), 0 * GB);
     Assert.assertEquals(totalResource.getVirtualCores(), 0);
     // Check the available resource is 0/0

+ 2 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java

@@ -344,7 +344,7 @@ public class TestFifoScheduler {
     
     // SchedulerNode's total resource and available resource are changed.
     assertEquals(1024, scheduler.getNodeTracker().getNode(node0.getNodeID())
-        .getTotalResource().getMemorySize());
+        .getCapacity().getMemorySize());
     assertEquals(1024, scheduler.getNodeTracker().getNode(node0.getNodeID()).
         getUnallocatedResource().getMemorySize(), 1024);
     QueueInfo queueInfo = scheduler.getQueueInfo(null, false, false);
@@ -1293,7 +1293,7 @@ public class TestFifoScheduler {
     // Check total resource of scheduler node is also changed to 1 GB 1 core
     Resource totalResource =
         resourceManager.getResourceScheduler()
-            .getSchedulerNode(nm_0.getNodeId()).getTotalResource();
+            .getSchedulerNode(nm_0.getNodeId()).getCapacity();
     Assert.assertEquals(totalResource.getMemorySize(), 1 * GB);
     Assert.assertEquals(totalResource.getVirtualCores(), 1);
     // Check the available resource is 0/0