Browse Source

YARN-1506. Changed RMNode/SchedulerNode to update resource with event notification. Contributed by Junping Du

Jian He 10 năm trước cách đây
mục cha
commit
5c14bc426b
28 tập tin đã thay đổi với 617 bổ sung187 xóa
  1. 8 19
      hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java
  2. 0 11
      hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java
  3. 3 0
      hadoop-yarn-project/CHANGES.txt
  4. 2 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceOption.java
  5. 9 3
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/UpdateNodeResourceResponse.java
  6. 21 12
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ResourceOptionPBImpl.java
  7. 1 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/UpdateNodeResourceResponsePBImpl.java
  8. 28 6
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java
  9. 1 3
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
  10. 0 12
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java
  11. 3 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeEventType.java
  12. 70 16
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
  13. 37 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeResourceUpdateEvent.java
  14. 29 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
  15. 11 14
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java
  16. 0 37
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java
  17. 20 6
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
  18. 43 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/NodeResourceUpdateSchedulerEvent.java
  19. 1 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEventType.java
  20. 24 4
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
  21. 9 3
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
  22. 5 17
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java
  23. 4 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
  24. 85 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java
  25. 73 2
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java
  26. 12 7
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestNMReconnect.java
  27. 106 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
  28. 12 12
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java

+ 8 - 19
hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java

@@ -32,7 +32,6 @@ import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.NodeState;
 import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.api.records.ResourceOption;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode
@@ -55,7 +54,7 @@ public class NodeInfo {
     private String nodeAddr;
     private String httpAddress;
     private int cmdPort;
-    private volatile ResourceOption perNode;
+    private volatile Resource perNode;
     private String rackName;
     private String healthReport;
     private NodeState state;
@@ -63,7 +62,7 @@ public class NodeInfo {
     private List<ApplicationId> toCleanUpApplications;
     
     public FakeRMNodeImpl(NodeId nodeId, String nodeAddr, String httpAddress,
-        ResourceOption perNode, String rackName, String healthReport,
+        Resource perNode, String rackName, String healthReport,
         int cmdPort, String hostName, NodeState state) {
       this.nodeId = nodeId;
       this.nodeAddr = nodeAddr;
@@ -111,10 +110,6 @@ public class NodeInfo {
     }
 
     public Resource getTotalCapability() {
-      return perNode.getResource();
-    }
-    
-    public ResourceOption getResourceOption() {
       return perNode;
     }
 
@@ -159,32 +154,26 @@ public class NodeInfo {
       return list;
     }
 
-	@Override
-	public String getNodeManagerVersion() {
-		// TODO Auto-generated method stub
-		return null;
-	}
-
     @Override
-    public void setResourceOption(ResourceOption resourceOption) {
-      perNode = resourceOption;
+    public String getNodeManagerVersion() {
+      return null;
     }
+
   }
 
   public static RMNode newNodeInfo(String rackName, String hostName,
-                              final ResourceOption resourceOption, int port) {
+                              final Resource resource, int port) {
     final NodeId nodeId = newNodeID(hostName, port);
     final String nodeAddr = hostName + ":" + port;
     final String httpAddress = hostName;
     
     return new FakeRMNodeImpl(nodeId, nodeAddr, httpAddress,
-        resourceOption, rackName, "Me good",
+        resource, rackName, "Me good",
         port, hostName, null);
   }
   
   public static RMNode newNodeInfo(String rackName, String hostName,
                               final Resource resource) {
-    return newNodeInfo(rackName, hostName, ResourceOption.newInstance(resource,
-        RMNode.OVER_COMMIT_TIMEOUT_MILLIS_DEFAULT), NODE_ID++);
+    return newNodeInfo(rackName, hostName, resource, NODE_ID++);
   }
 }

+ 0 - 11
hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java

@@ -26,7 +26,6 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.NodeState;
 import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.api.records.ResourceOption;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode
@@ -148,14 +147,4 @@ public class RMNodeWrapper implements RMNode {
     return node.getNodeManagerVersion();
   }
 
-  @Override
-  public void setResourceOption(ResourceOption resourceOption) {
-    node.setResourceOption(resourceOption);
-  }
-  
-  @Override
-  public ResourceOption getResourceOption() {
-    return node.getResourceOption();
-  }
-
 }

+ 3 - 0
hadoop-yarn-project/CHANGES.txt

@@ -166,6 +166,9 @@ Release 2.6.0 - UNRELEASED
     YARN-2360. Fair Scheduler: Display dynamic fair share for queues on the 
     scheduler page. (Ashwin Shankar and Wei Yan via kasha)
 
+    YARN-1506. Changed RMNode/SchedulerNode to update resource with event
+    notification. (Junping Du via jianhe)
+
   OPTIMIZATIONS
 
   BUG FIXES

+ 2 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceOption.java

@@ -62,6 +62,8 @@ public abstract class ResourceOption {
   @Evolving
   protected abstract void setOverCommitTimeout(int overCommitTimeout);
   
+  @Private
+  @Evolving
   protected abstract void build();
   
   @Override

+ 9 - 3
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/UpdateNodeResourceResponse.java

@@ -17,9 +17,10 @@
  */
 package org.apache.hadoop.yarn.server.api.protocolrecords;
 
-import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceStability.Evolving;
 import org.apache.hadoop.yarn.server.api.ResourceManagerAdministrationProtocol;
+import org.apache.hadoop.yarn.util.Records;
 
 /**
  * <p>The response sent by the <code>ResourceManager</code> to Admin client on
@@ -30,8 +31,13 @@ import org.apache.hadoop.yarn.server.api.ResourceManagerAdministrationProtocol;
  * @see ResourceManagerAdministrationProtocol#updateNodeResource(
  *      UpdateNodeResourceRequest)
  */
-@Public
+@Private
 @Evolving
-public interface UpdateNodeResourceResponse {
+public abstract class UpdateNodeResourceResponse {
+  public static UpdateNodeResourceResponse newInstance(){
+    UpdateNodeResourceResponse response = 
+        Records.newRecord(UpdateNodeResourceResponse.class);
+    return response;
+  }
 
 }

+ 21 - 12
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ResourceOptionPBImpl.java

@@ -22,14 +22,15 @@ import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceOption;
 import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto;
 import org.apache.hadoop.yarn.proto.YarnProtos.ResourceOptionProto;
+import org.apache.hadoop.yarn.proto.YarnProtos.ResourceOptionProtoOrBuilder;
 
 import com.google.common.base.Preconditions;
 
 public class ResourceOptionPBImpl extends ResourceOption {
 
-  ResourceOptionProto proto = null;
+  ResourceOptionProto proto = ResourceOptionProto.getDefaultInstance();
   ResourceOptionProto.Builder builder = null;
-  private Resource resource = null;
+  boolean viaProto = false;
 
   public ResourceOptionPBImpl() {
     builder = ResourceOptionProto.newBuilder();
@@ -37,39 +38,46 @@ public class ResourceOptionPBImpl extends ResourceOption {
 
   public ResourceOptionPBImpl(ResourceOptionProto proto) {
     this.proto = proto;
-    this.resource = convertFromProtoFormat(proto.getResource());
+    viaProto = true;
   }
   
   public ResourceOptionProto getProto() {
+    proto = viaProto ? proto : builder.build();
+    viaProto = true;
     return proto;
   }
   
   @Override
   public Resource getResource() {
-    return this.resource;
+    ResourceOptionProtoOrBuilder p = viaProto ? proto : builder;
+    return convertFromProtoFormat(p.getResource());
   }
 
   @Override
   protected void setResource(Resource resource) {
-    if (resource != null) {
-      Preconditions.checkNotNull(builder);
-      builder.setResource(convertToProtoFormat(resource));
-    }
-    this.resource = resource;
+    maybeInitBuilder();
+    builder.setResource(convertToProtoFormat(resource));
   }
 
   @Override
   public int getOverCommitTimeout() {
-    Preconditions.checkNotNull(proto);
-    return proto.getOverCommitTimeout();
+    ResourceOptionProtoOrBuilder p = viaProto ? proto : builder;
+    return p.getOverCommitTimeout();
   }
 
   @Override
   protected void setOverCommitTimeout(int overCommitTimeout) {
-    Preconditions.checkNotNull(builder);
+    maybeInitBuilder();
     builder.setOverCommitTimeout(overCommitTimeout);
   }
   
+  private void maybeInitBuilder() {
+    if (viaProto || builder == null) {
+      builder = ResourceOptionProto.newBuilder(proto);
+    }
+    viaProto = false;
+  }
+  
   private ResourceProto convertToProtoFormat(
       Resource resource) {
     return ((ResourcePBImpl)resource).getProto();
@@ -83,6 +91,7 @@ public class ResourceOptionPBImpl extends ResourceOption {
   @Override
   protected void build() {
     proto = builder.build();
+    viaProto = true;
     builder = null;
   }
 

+ 1 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/UpdateNodeResourceResponsePBImpl.java

@@ -20,7 +20,7 @@ package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb;
 import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.UpdateNodeResourceResponseProto;
 import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceResponse;
 
-public class UpdateNodeResourceResponsePBImpl implements UpdateNodeResourceResponse {
+public class UpdateNodeResourceResponsePBImpl extends UpdateNodeResourceResponse {
 
   UpdateNodeResourceResponseProto proto = UpdateNodeResourceResponseProto.getDefaultInstance();
   UpdateNodeResourceResponseProto.Builder builder = null;

+ 28 - 6
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java

@@ -72,6 +72,7 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshUserToGroupsMapp
 import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceRequest;
 import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceResponse;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeResourceUpdateEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.security.authorize.RMPolicyProvider;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -513,9 +514,20 @@ public class AdminService extends CompositeService implements
     return UserGroupInformation.createRemoteUser(user).getGroupNames();
   }
 
+  @SuppressWarnings("unchecked")
   @Override
   public UpdateNodeResourceResponse updateNodeResource(
       UpdateNodeResourceRequest request) throws YarnException, IOException {
+    String argName = "updateNodeResource";
+    UserGroupInformation user = checkAcls(argName);
+    
+    if (!isRMActive()) {
+      RMAuditLogger.logFailure(user.getShortUserName(), argName,
+          adminAcl.toString(), "AdminService",
+          "ResourceManager is not active. Can not update node resource.");
+      throwStandbyException();
+    }
+    
     Map<NodeId, ResourceOption> nodeResourceMap = request.getNodeResourceMap();
     Set<NodeId> nodeIds = nodeResourceMap.keySet();
     // verify nodes are all valid first. 
@@ -536,21 +548,31 @@ public class AdminService extends CompositeService implements
     // Notice: it is still possible to have invalid NodeIDs as nodes decommission
     // may happen just at the same time. This time, only log and skip absent
     // nodes without throwing any exceptions.
+    boolean allSuccess = true;
     for (Map.Entry<NodeId, ResourceOption> entry : nodeResourceMap.entrySet()) {
       ResourceOption newResourceOption = entry.getValue();
       NodeId nodeId = entry.getKey();
       RMNode node = this.rmContext.getRMNodes().get(nodeId);
+      
       if (node == null) {
         LOG.warn("Resource update get failed on an unrecognized node: " + nodeId);
+        allSuccess = false;
       } else {
-        node.setResourceOption(newResourceOption);
-        LOG.info("Update resource successfully on node(" + node.getNodeID()
-            +") with resource(" + newResourceOption.toString() + ")");
+        // update resource to RMNode
+        this.rmContext.getDispatcher().getEventHandler()
+          .handle(new RMNodeResourceUpdateEvent(nodeId, newResourceOption));
+        LOG.info("Update resource on node(" + node.getNodeID()
+            + ") with resource(" + newResourceOption.toString() + ")");
+
       }
     }
-    UpdateNodeResourceResponse response = recordFactory.newRecordInstance(
-          UpdateNodeResourceResponse.class);
-      return response;
+    if (allSuccess) {
+      RMAuditLogger.logSuccess(user.getShortUserName(), argName,
+          "AdminService");
+    }
+    UpdateNodeResourceResponse response = 
+        UpdateNodeResourceResponse.newInstance();
+    return response;
   }
 
   private synchronized Configuration getConfiguration(Configuration conf,

+ 1 - 3
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java

@@ -36,7 +36,6 @@ import org.apache.hadoop.yarn.api.records.ContainerState;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.api.records.ResourceOption;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
@@ -299,8 +298,7 @@ public class ResourceTrackerService extends AbstractService implements
         .getCurrentKey());    
 
     RMNode rmNode = new RMNodeImpl(nodeId, rmContext, host, cmPort, httpPort,
-        resolve(host), ResourceOption.newInstance(capability, RMNode.OVER_COMMIT_TIMEOUT_MILLIS_DEFAULT),
-        nodeManagerVersion);
+        resolve(host), capability, nodeManagerVersion);
 
     RMNode oldNode = this.rmContext.getRMNodes().putIfAbsent(nodeId, rmNode);
     if (oldNode == null) {

+ 0 - 12
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java

@@ -101,18 +101,6 @@ public interface RMNode {
    */
   public Resource getTotalCapability();
   
-  /**
-   * Set resource option with total available resource and overCommitTimoutMillis
-   * @param resourceOption
-   */
-  public void setResourceOption(ResourceOption resourceOption);
-  
-  /**
-   * resource option with total available resource and overCommitTimoutMillis
-   * @return ResourceOption
-   */
-  public ResourceOption getResourceOption();
-  
   /**
    * The rack name for this node manager.
    * @return the rack name.

+ 3 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeEventType.java

@@ -24,6 +24,9 @@ public enum RMNodeEventType {
   
   // Source: AdminService
   DECOMMISSION,
+  
+  // Source: AdminService, ResourceTrackerService
+  RESOURCE_UPDATE,
 
   // ResourceTrackerService
   STATUS_UPDATE,

+ 70 - 16
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java

@@ -58,6 +58,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRunningOnNodeEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeResourceUpdateSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils.ContainerIdComparator;
 import org.apache.hadoop.yarn.state.InvalidStateTransitonException;
@@ -96,7 +97,7 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
   private int httpPort;
   private final String nodeAddress; // The containerManager address
   private String httpAddress;
-  private volatile ResourceOption resourceOption;
+  private volatile Resource totalCapability;
   private final Node node;
 
   private String healthReport;
@@ -129,6 +130,9 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
      //Transitions from NEW state
      .addTransition(NodeState.NEW, NodeState.RUNNING, 
          RMNodeEventType.STARTED, new AddNodeTransition())
+     .addTransition(NodeState.NEW, NodeState.NEW,
+         RMNodeEventType.RESOURCE_UPDATE, 
+         new UpdateNodeResourceWhenUnusableTransition())
 
      //Transitions from RUNNING state
      .addTransition(NodeState.RUNNING, 
@@ -149,6 +153,23 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
          RMNodeEventType.CLEANUP_CONTAINER, new CleanUpContainerTransition())
      .addTransition(NodeState.RUNNING, NodeState.RUNNING,
          RMNodeEventType.RECONNECTED, new ReconnectNodeTransition())
+     .addTransition(NodeState.RUNNING, NodeState.RUNNING,
+         RMNodeEventType.RESOURCE_UPDATE, new UpdateNodeResourceWhenRunningTransition())
+
+     //Transitions from REBOOTED state
+     .addTransition(NodeState.REBOOTED, NodeState.REBOOTED,
+         RMNodeEventType.RESOURCE_UPDATE, 
+         new UpdateNodeResourceWhenUnusableTransition())
+         
+     //Transitions from DECOMMISSIONED state
+     .addTransition(NodeState.DECOMMISSIONED, NodeState.DECOMMISSIONED,
+         RMNodeEventType.RESOURCE_UPDATE, 
+         new UpdateNodeResourceWhenUnusableTransition())
+         
+     //Transitions from LOST state
+     .addTransition(NodeState.LOST, NodeState.LOST,
+         RMNodeEventType.RESOURCE_UPDATE, 
+         new UpdateNodeResourceWhenUnusableTransition())
 
      //Transitions from UNHEALTHY state
      .addTransition(NodeState.UNHEALTHY, 
@@ -169,6 +190,8 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
          RMNodeEventType.CLEANUP_APP, new CleanUpAppTransition())
      .addTransition(NodeState.UNHEALTHY, NodeState.UNHEALTHY,
          RMNodeEventType.CLEANUP_CONTAINER, new CleanUpContainerTransition())
+     .addTransition(NodeState.UNHEALTHY, NodeState.UNHEALTHY,
+         RMNodeEventType.RESOURCE_UPDATE, new UpdateNodeResourceWhenUnusableTransition())
          
      // create the topology tables
      .installTopology(); 
@@ -177,13 +200,13 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
                              RMNodeEvent> stateMachine;
 
   public RMNodeImpl(NodeId nodeId, RMContext context, String hostName,
-      int cmPort, int httpPort, Node node, ResourceOption resourceOption, String nodeManagerVersion) {
+      int cmPort, int httpPort, Node node, Resource capability, String nodeManagerVersion) {
     this.nodeId = nodeId;
     this.context = context;
     this.hostName = hostName;
     this.commandPort = cmPort;
     this.httpPort = httpPort;
-    this.resourceOption = resourceOption; 
+    this.totalCapability = capability; 
     this.nodeAddress = hostName + ":" + cmPort;
     this.httpAddress = hostName + ":" + httpPort;
     this.node = node;
@@ -239,17 +262,7 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
 
   @Override
   public Resource getTotalCapability() {
-    return this.resourceOption.getResource();
-  }
-  
-  @Override
-  public void setResourceOption(ResourceOption resourceOption) {
-    this.resourceOption = resourceOption;
-  }
-  
-  @Override
-  public ResourceOption getResourceOption(){
-    return this.resourceOption;
+    return this.totalCapability;
   }
 
   @Override
@@ -473,6 +486,13 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
     context.getDispatcher().getEventHandler()
         .handle(new RMAppRunningOnNodeEvent(appId, nodeId));
   }
+  
+  private static void updateNodeResourceFromEvent(RMNodeImpl rmNode, 
+     RMNodeResourceUpdateEvent event){
+      ResourceOption resourceOption = event.getResourceOption();
+      // Set resource on RMNode
+      rmNode.totalCapability = resourceOption.getResource();
+  }
 
   public static class AddNodeTransition implements
       SingleArcTransition<RMNodeImpl, RMNodeEvent> {
@@ -526,8 +546,8 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
       rmNode.nodeManagerVersion = newNode.getNodeManagerVersion();
       rmNode.httpPort = newNode.getHttpPort();
       rmNode.httpAddress = newNode.getHttpAddress();
-      rmNode.resourceOption = newNode.getResourceOption();
-
+      rmNode.totalCapability = newNode.getTotalCapability();
+      
       // Reset heartbeat ID since node just restarted.
       rmNode.getLastNodeHeartBeatResponse().setResponseId(0);
 
@@ -540,9 +560,43 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
       rmNode.context.getDispatcher().getEventHandler().handle(
           new NodesListManagerEvent(
               NodesListManagerEventType.NODE_USABLE, rmNode));
+      if (rmNode.getState().equals(NodeState.RUNNING)) {
+        // Update scheduler node's capacity for reconnect node.
+        rmNode.context.getDispatcher().getEventHandler().handle(
+            new NodeResourceUpdateSchedulerEvent(rmNode, 
+                ResourceOption.newInstance(rmNode.totalCapability, -1)));
+      }
+      
     }
   }
+  
+  public static class UpdateNodeResourceWhenRunningTransition
+      implements SingleArcTransition<RMNodeImpl, RMNodeEvent> {
 
+    @Override
+    public void transition(RMNodeImpl rmNode, RMNodeEvent event) {
+      RMNodeResourceUpdateEvent updateEvent = (RMNodeResourceUpdateEvent)event;
+      updateNodeResourceFromEvent(rmNode, updateEvent);
+      // Notify new resourceOption to scheduler
+      rmNode.context.getDispatcher().getEventHandler().handle(
+          new NodeResourceUpdateSchedulerEvent(rmNode, updateEvent.getResourceOption()));
+    }
+  }
+  
+  public static class UpdateNodeResourceWhenUnusableTransition
+      implements SingleArcTransition<RMNodeImpl, RMNodeEvent> {
+
+    @Override
+    public void transition(RMNodeImpl rmNode, RMNodeEvent event) {
+      // The node is not usable, only log a warn message
+      LOG.warn("Try to update resource on a "+ rmNode.getState().toString() +
+          " node: "+rmNode.toString());
+      updateNodeResourceFromEvent(rmNode, (RMNodeResourceUpdateEvent)event);
+      // No need to notify scheduler as schedulerNode is not function now
+      // and can sync later from RMnode.
+    }
+  }
+  
   public static class CleanUpAppTransition
     implements SingleArcTransition<RMNodeImpl, RMNodeEvent> {
 

+ 37 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeResourceUpdateEvent.java

@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.rmnode;
+
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.ResourceOption;
+
+public class RMNodeResourceUpdateEvent extends RMNodeEvent {
+
+  private final ResourceOption resourceOption;
+  
+  public RMNodeResourceUpdateEvent(NodeId nodeId, ResourceOption resourceOption) {
+    super(nodeId, RMNodeEventType.RESOURCE_UPDATE);
+    this.resourceOption = resourceOption;
+  }
+
+  public ResourceOption getResourceOption() {
+    return resourceOption;
+  }
+
+}

+ 29 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java

@@ -41,6 +41,7 @@ import org.apache.hadoop.yarn.api.records.ContainerState;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceOption;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
@@ -473,4 +474,32 @@ public abstract class AbstractYarnScheduler
           .handle(new RMAppEvent(app.getApplicationId(), RMAppEventType.KILL));
     }
   }
+  
+  /**
+   * Process resource update on a node.
+   */
+  public synchronized void updateNodeResource(RMNode nm, 
+      ResourceOption resourceOption) {
+  
+    SchedulerNode node = getSchedulerNode(nm.getNodeID());
+    Resource newResource = resourceOption.getResource();
+    Resource oldResource = node.getTotalResource();
+    if(!oldResource.equals(newResource)) {
+      // Log resource change
+      LOG.info("Update resource on node: " + node.getNodeName() 
+          + " from: " + oldResource + ", to: "
+          + newResource);
+
+      // update resource to node
+      node.setTotalResource(newResource);
+    
+      // update resource to clusterResource
+      Resources.subtractFrom(clusterResource, oldResource);
+      Resources.addTo(clusterResource, newResource);
+    } else {
+      // Log resource change
+      LOG.warn("Update resource on node: " + node.getNodeName() 
+          + " with the same resource: " + newResource);
+    }
+  }
 }

+ 11 - 14
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java

@@ -77,6 +77,16 @@ public abstract class SchedulerNode {
     return this.rmNode;
   }
 
+  /**
+   * Set total resources on the node.
+   * @param resource total resources on the node.
+   */
+  public synchronized void setTotalResource(Resource resource){
+    this.totalResourceCapability = resource;
+    this.availableResource = Resources.subtract(totalResourceCapability,
+      this.usedResource);
+  }
+  
   /**
    * Get the ID of the node which contains both its hostname and port.
    * 
@@ -158,7 +168,7 @@ public abstract class SchedulerNode {
    * 
    * @return total resources on the node.
    */
-  public Resource getTotalResource() {
+  public synchronized Resource getTotalResource() {
     return this.totalResourceCapability;
   }
 
@@ -259,19 +269,6 @@ public abstract class SchedulerNode {
     this.reservedContainer = reservedContainer;
   }
 
-  /**
-   * Apply delta resource on node's available resource.
-   * 
-   * @param deltaResource
-   *          the delta of resource need to apply to node
-   */
-  public synchronized void
-      applyDeltaOnAvailableResource(Resource deltaResource) {
-    // we can only adjust available resource if total resource is changed.
-    Resources.addTo(this.availableResource, deltaResource);
-  }
-
-
   public synchronized void recoverContainer(RMContainer rmContainer) {
     if (rmContainer.getState().equals(RMContainerState.COMPLETED)) {
       return;

+ 0 - 37
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java

@@ -19,7 +19,6 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
 
 import java.util.List;
 
-import org.apache.commons.logging.Log;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
@@ -147,42 +146,6 @@ public class SchedulerUtils {
     ask.setCapability(normalized);
   }
   
-  /**
-   * Update resource in SchedulerNode if any resource change in RMNode.
-   * @param node SchedulerNode with old resource view
-   * @param rmNode RMNode with new resource view
-   * @param clusterResource the cluster's resource that need to update
-   * @param log Scheduler's log for resource change
-   * @return true if the resources have changed
-   */
-  public static boolean updateResourceIfChanged(SchedulerNode node,
-      RMNode rmNode, Resource clusterResource, Log log) {
-    boolean result = false;
-    Resource oldAvailableResource = node.getAvailableResource();
-    Resource newAvailableResource = Resources.subtract(
-        rmNode.getTotalCapability(), node.getUsedResource());
-    
-    if (!newAvailableResource.equals(oldAvailableResource)) {
-      result = true;
-      Resource deltaResource = Resources.subtract(newAvailableResource,
-          oldAvailableResource);
-      // Reflect resource change to scheduler node.
-      node.applyDeltaOnAvailableResource(deltaResource);
-      // Reflect resource change to clusterResource.
-      Resources.addTo(clusterResource, deltaResource);
-      // TODO process resource over-commitment case (allocated containers
-      // > total capacity) in different option by getting value of
-      // overCommitTimeoutMillis.
-      
-      // Log resource change
-      log.info("Resource change on node: " + rmNode.getNodeAddress() 
-          + " with delta: CPU: " + deltaResource.getMemory() + "core, Memory: "
-          + deltaResource.getMemory() +"MB");
-    }
-
-    return result;
-  }
-
   /**
    * Utility method to normalize a list of resource requests, by insuring that
    * the memory for each request is a multiple of minMemory and is not zero.

+ 20 - 6
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java

@@ -50,6 +50,8 @@ import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.QueueACL;
 import org.apache.hadoop.yarn.api.records.QueueInfo;
 import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceOption;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
@@ -82,6 +84,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedS
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerExpiredSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeResourceUpdateSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
@@ -866,12 +869,6 @@ public class CapacityScheduler extends
 
     FiCaSchedulerNode node = getNode(nm.getNodeID());
     
-    // Update resource if any change
-    if (SchedulerUtils.updateResourceIfChanged(node, nm, clusterResource,
-        LOG)) {
-      root.updateClusterResource(clusterResource);
-    }
-    
     List<UpdatedContainerInfo> containerInfoList = nm.pullContainerUpdates();
     List<ContainerStatus> newlyLaunchedContainers = new ArrayList<ContainerStatus>();
     List<ContainerStatus> completedContainers = new ArrayList<ContainerStatus>();
@@ -899,6 +896,15 @@ public class CapacityScheduler extends
         + " availableResource: " + node.getAvailableResource());
     }
   }
+  
+  /**
+   * Process resource update on a node.
+   */
+  private synchronized void updateNodeAndQueueResource(RMNode nm, 
+      ResourceOption resourceOption) {
+    updateNodeResource(nm, resourceOption);
+    root.updateClusterResource(clusterResource);
+  }
 
   private synchronized void allocateContainersToNode(FiCaSchedulerNode node) {
 
@@ -969,6 +975,14 @@ public class CapacityScheduler extends
       removeNode(nodeRemovedEvent.getRemovedRMNode());
     }
     break;
+    case NODE_RESOURCE_UPDATE:
+    {
+      NodeResourceUpdateSchedulerEvent nodeResourceUpdatedEvent = 
+          (NodeResourceUpdateSchedulerEvent)event;
+      updateNodeAndQueueResource(nodeResourceUpdatedEvent.getRMNode(),
+        nodeResourceUpdatedEvent.getResourceOption());
+    }
+    break;
     case NODE_UPDATE:
     {
       NodeUpdateSchedulerEvent nodeUpdatedEvent = (NodeUpdateSchedulerEvent)event;

+ 43 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/NodeResourceUpdateSchedulerEvent.java

@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.event;
+
+import org.apache.hadoop.yarn.api.records.ResourceOption;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+
+public class NodeResourceUpdateSchedulerEvent extends SchedulerEvent {
+
+  private final RMNode rmNode;
+  private final ResourceOption resourceOption;
+  
+  public NodeResourceUpdateSchedulerEvent(RMNode rmNode,
+      ResourceOption resourceOption) {
+    super(SchedulerEventType.NODE_RESOURCE_UPDATE);
+    this.rmNode = rmNode;
+    this.resourceOption = resourceOption;
+  }
+
+  public RMNode getRMNode() {
+    return rmNode;
+  }
+
+  public ResourceOption getResourceOption() {
+    return resourceOption;
+  }
+
+}

+ 1 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEventType.java

@@ -24,6 +24,7 @@ public enum SchedulerEventType {
   NODE_ADDED,
   NODE_REMOVED,
   NODE_UPDATE,
+  NODE_RESOURCE_UPDATE,
 
   // Source: RMApp
   APP_ADDED,

+ 24 - 4
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java

@@ -45,6 +45,7 @@ import org.apache.hadoop.yarn.api.records.QueueACL;
 import org.apache.hadoop.yarn.api.records.QueueInfo;
 import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
 import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceOption;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
@@ -79,6 +80,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedS
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerExpiredSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeResourceUpdateSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
@@ -956,7 +958,7 @@ public class FairScheduler extends
         allocation.getNMTokenList());
     }
   }
-
+  
   /**
    * Process a heartbeat update from a node.
    */
@@ -967,9 +969,6 @@ public class FairScheduler extends
     }
     eventLog.log("HEARTBEAT", nm.getHostName());
     FSSchedulerNode node = getFSSchedulerNode(nm.getNodeID());
-
-    // Update resource if any change
-    SchedulerUtils.updateResourceIfChanged(node, nm, clusterResource, LOG);
     
     List<UpdatedContainerInfo> containerInfoList = nm.pullContainerUpdates();
     List<ContainerStatus> newlyLaunchedContainers = new ArrayList<ContainerStatus>();
@@ -1173,6 +1172,15 @@ public class FairScheduler extends
       removeApplication(appRemovedEvent.getApplicationID(),
         appRemovedEvent.getFinalState());
       break;
+    case NODE_RESOURCE_UPDATE:
+      if (!(event instanceof NodeResourceUpdateSchedulerEvent)) {
+        throw new RuntimeException("Unexpected event type: " + event);
+      }
+      NodeResourceUpdateSchedulerEvent nodeResourceUpdatedEvent = 
+          (NodeResourceUpdateSchedulerEvent)event;
+      updateNodeResource(nodeResourceUpdatedEvent.getRMNode(),
+            nodeResourceUpdatedEvent.getResourceOption());
+      break;
     case APP_ATTEMPT_ADDED:
       if (!(event instanceof AppAttemptAddedSchedulerEvent)) {
         throw new RuntimeException("Unexpected event type: " + event);
@@ -1534,4 +1542,16 @@ public class FairScheduler extends
     }
     return queue1; // names are identical
   }
+  
+  /**
+   * Process resource update on a node and update Queue.
+   */
+  @Override
+  public synchronized void updateNodeResource(RMNode nm, 
+      ResourceOption resourceOption) {
+    super.updateNodeResource(nm, resourceOption);
+    updateRootQueueMetrics();
+    queueMgr.getRootQueue().setSteadyFairShare(clusterResource);
+    queueMgr.getRootQueue().recomputeSteadyShares();
+  }
 }

+ 9 - 3
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java

@@ -85,6 +85,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedS
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerExpiredSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeResourceUpdateSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
@@ -681,9 +682,6 @@ public class FifoScheduler extends
   private synchronized void nodeUpdate(RMNode rmNode) {
     FiCaSchedulerNode node = getNode(rmNode.getNodeID());
     
-    // Update resource if any change
-    SchedulerUtils.updateResourceIfChanged(node, rmNode, clusterResource, LOG);
-    
     List<UpdatedContainerInfo> containerInfoList = rmNode.pullContainerUpdates();
     List<ContainerStatus> newlyLaunchedContainers = new ArrayList<ContainerStatus>();
     List<ContainerStatus> completedContainers = new ArrayList<ContainerStatus>();
@@ -750,6 +748,14 @@ public class FifoScheduler extends
       removeNode(nodeRemovedEvent.getRemovedRMNode());
     }
     break;
+    case NODE_RESOURCE_UPDATE:
+    {
+      NodeResourceUpdateSchedulerEvent nodeResourceUpdatedEvent = 
+          (NodeResourceUpdateSchedulerEvent)event;
+      updateNodeResource(nodeResourceUpdatedEvent.getRMNode(),
+        nodeResourceUpdatedEvent.getResourceOption());
+    }
+    break;
     case NODE_UPDATE:
     {
       NodeUpdateSchedulerEvent nodeUpdatedEvent = 

+ 5 - 17
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java

@@ -27,7 +27,6 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.NodeState;
 import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.api.records.ResourceOption;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
@@ -94,14 +93,14 @@ public class MockNodes {
     private String nodeAddr;
     private String httpAddress;
     private int cmdPort;
-    private ResourceOption perNode;
+    private Resource perNode;
     private String rackName;
     private String healthReport;
     private long lastHealthReportTime;
     private NodeState state;
 
     public MockRMNodeImpl(NodeId nodeId, String nodeAddr, String httpAddress,
-        ResourceOption perNode, String rackName, String healthReport,
+        Resource perNode, String rackName, String healthReport,
         long lastHealthReportTime, int cmdPort, String hostName, NodeState state) {
       this.nodeId = nodeId;
       this.nodeAddr = nodeAddr;
@@ -147,7 +146,7 @@ public class MockNodes {
 
     @Override
     public Resource getTotalCapability() {
-      return this.perNode.getResource();
+      return this.perNode;
     }
 
     @Override
@@ -203,16 +202,6 @@ public class MockNodes {
     public long getLastHealthReportTime() {
       return lastHealthReportTime;
     }
-
-    @Override
-    public void setResourceOption(ResourceOption resourceOption) {
-      this.perNode = resourceOption;
-    }
-    
-    @Override
-    public ResourceOption getResourceOption(){
-      return this.perNode;
-    }
     
   };
 
@@ -232,9 +221,8 @@ public class MockNodes {
 
     final String httpAddress = httpAddr;
     String healthReport = (state == NodeState.UNHEALTHY) ? null : "HealthyMe";
-    return new MockRMNodeImpl(nodeID, nodeAddr, httpAddress,
-        ResourceOption.newInstance(perNode, RMNode.OVER_COMMIT_TIMEOUT_MILLIS_DEFAULT),
-        rackName, healthReport, 0, nid, hostName, state); 
+    return new MockRMNodeImpl(nodeID, nodeAddr, httpAddress, perNode,
+        rackName, healthReport, 0, nid, hostName, state);
   }
 
   public static RMNode nodeInfo(int rack, final Resource perNode,

+ 4 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java

@@ -457,7 +457,6 @@ public class MockRM extends ResourceManager {
 
   @Override
   protected ResourceTrackerService createResourceTrackerService() {
-    Configuration conf = new Configuration();
 
     RMContainerTokenSecretManager containerTokenSecretManager =
         getRMContext().getContainerTokenSecretManager();
@@ -547,6 +546,10 @@ public class MockRM extends ResourceManager {
   public RMAppManager getRMAppManager() {
     return this.rmAppManager;
   }
+  
+  public AdminService getAdminService() {
+    return this.adminService;
+  }
 
   @Override
   protected void startWepApp() {

+ 85 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java

@@ -23,7 +23,9 @@ import static org.junit.Assert.fail;
 
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -35,9 +37,13 @@ import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerState;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceOption;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceRequest;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
@@ -509,6 +515,85 @@ public class TestFifoScheduler {
     rm.stop();
   }
   
+  @Test
+  public void testResourceOverCommit() throws Exception {
+    MockRM rm = new MockRM(conf);
+    rm.start();
+    
+    MockNM nm1 = rm.registerNode("127.0.0.1:1234", 4 * GB);
+    
+    RMApp app1 = rm.submitApp(2048);
+    // kick the scheduling, 2 GB given to AM1, remaining 2GB on nm1
+    nm1.nodeHeartbeat(true);
+    RMAppAttempt attempt1 = app1.getCurrentAppAttempt();
+    MockAM am1 = rm.sendAMLaunched(attempt1.getAppAttemptId());
+    am1.registerAppAttempt();
+    SchedulerNodeReport report_nm1 = rm.getResourceScheduler().getNodeReport(
+        nm1.getNodeId());
+    // check node report, 2 GB used and 2 GB available
+    Assert.assertEquals(2 * GB, report_nm1.getUsedResource().getMemory());
+    Assert.assertEquals(2 * GB, report_nm1.getAvailableResource().getMemory());
+
+    // add request for containers
+    am1.addRequests(new String[] { "127.0.0.1", "127.0.0.2" }, 2 * GB, 1, 1);
+    AllocateResponse alloc1Response = am1.schedule(); // send the request
+
+    // kick the scheduler, 2 GB given to AM1, resource remaining 0
+    nm1.nodeHeartbeat(true);
+    while (alloc1Response.getAllocatedContainers().size() < 1) {
+      LOG.info("Waiting for containers to be created for app 1...");
+      Thread.sleep(1000);
+      alloc1Response = am1.schedule();
+    }
+
+    List<Container> allocated1 = alloc1Response.getAllocatedContainers();
+    Assert.assertEquals(1, allocated1.size());
+    Assert.assertEquals(2 * GB, allocated1.get(0).getResource().getMemory());
+    Assert.assertEquals(nm1.getNodeId(), allocated1.get(0).getNodeId());
+    
+    report_nm1 = rm.getResourceScheduler().getNodeReport(nm1.getNodeId());
+    // check node report, 4 GB used and 0 GB available
+    Assert.assertEquals(0, report_nm1.getAvailableResource().getMemory());
+    Assert.assertEquals(4 * GB, report_nm1.getUsedResource().getMemory());
+
+    // check container is assigned with 2 GB.
+    Container c1 = allocated1.get(0);
+    Assert.assertEquals(2 * GB, c1.getResource().getMemory());
+    
+    // update node resource to 2 GB, so resource is over-consumed.
+    Map<NodeId, ResourceOption> nodeResourceMap = 
+        new HashMap<NodeId, ResourceOption>();
+    nodeResourceMap.put(nm1.getNodeId(), 
+        ResourceOption.newInstance(Resource.newInstance(2 * GB, 1), -1));
+    UpdateNodeResourceRequest request = 
+        UpdateNodeResourceRequest.newInstance(nodeResourceMap);
+    AdminService as = rm.adminService;
+    as.updateNodeResource(request);
+    
+    // Now, the used resource is still 4 GB, and available resource is minus value.
+    report_nm1 = rm.getResourceScheduler().getNodeReport(nm1.getNodeId());
+    Assert.assertEquals(4 * GB, report_nm1.getUsedResource().getMemory());
+    Assert.assertEquals(-2 * GB, report_nm1.getAvailableResource().getMemory());
+    
+    // Check container can complete successfully in case of resource over-commitment.
+    ContainerStatus containerStatus = BuilderUtils.newContainerStatus(
+        c1.getId(), ContainerState.COMPLETE, "", 0);
+    nm1.containerStatus(containerStatus);
+    int waitCount = 0;
+    while (attempt1.getJustFinishedContainers().size() < 1
+        && waitCount++ != 20) {
+      LOG.info("Waiting for containers to be finished for app 1... Tried "
+          + waitCount + " times already..");
+      Thread.sleep(100);
+    }
+    Assert.assertEquals(1, attempt1.getJustFinishedContainers().size());
+    Assert.assertEquals(1, am1.schedule().getCompletedContainersStatuses().size());
+    report_nm1 = rm.getResourceScheduler().getNodeReport(nm1.getNodeId());
+    Assert.assertEquals(2 * GB, report_nm1.getUsedResource().getMemory());
+    // As container return 2 GB back, the available resource becomes 0 again.
+    Assert.assertEquals(0 * GB, report_nm1.getAvailableResource().getMemory());
+    rm.stop();
+  }
 
   public static void main(String[] args) throws Exception {
     TestFifoScheduler t = new TestFifoScheduler();

+ 73 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java

@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.yarn.server.resourcemanager;
 
+import static org.junit.Assert.assertEquals;
 import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doReturn;
@@ -48,6 +49,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeReconnectEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeResourceUpdateEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeStartedEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeStatusEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo;
@@ -463,8 +465,7 @@ public class TestRMNodeTransitions {
     NodeId nodeId = BuilderUtils.newNodeId("localhost", 0);
     Resource capability = Resource.newInstance(4096, 4);
     RMNodeImpl node = new RMNodeImpl(nodeId, rmContext,null, 0, 0,
-        null, ResourceOption.newInstance(capability,
-            RMNode.OVER_COMMIT_TIMEOUT_MILLIS_DEFAULT), nmVersion);
+        null, capability, nmVersion);
     node.handle(new RMNodeStartedEvent(node.getNodeID(), null, null));
     Assert.assertEquals(NodeState.RUNNING, node.getState());
     return node;
@@ -486,6 +487,25 @@ public class TestRMNodeTransitions {
     RMNodeImpl node = new RMNodeImpl(nodeId, rmContext, null, 0, 0, null, null, null);
     return node;
   }
+  
+  private RMNodeImpl getNewNode(Resource capability) {
+    NodeId nodeId = BuilderUtils.newNodeId("localhost", 0);
+    RMNodeImpl node = new RMNodeImpl(nodeId, rmContext, null, 0, 0, null, 
+        capability, null);
+    return node;
+  }
+  
+  private RMNodeImpl getRebootedNode() {
+    NodeId nodeId = BuilderUtils.newNodeId("localhost", 0);
+    Resource capability = Resource.newInstance(4096, 4);
+    RMNodeImpl node = new RMNodeImpl(nodeId, rmContext,null, 0, 0,
+        null, capability, null);
+    node.handle(new RMNodeStartedEvent(node.getNodeID(), null, null));
+    Assert.assertEquals(NodeState.RUNNING, node.getState());
+    node.handle(new RMNodeEvent(node.getNodeID(), RMNodeEventType.REBOOTING));
+    Assert.assertEquals(NodeState.REBOOTED, node.getState());
+    return node;
+  }
 
   @Test
   public void testAdd() {
@@ -534,6 +554,57 @@ public class TestRMNodeTransitions {
     Assert.assertEquals(NodesListManagerEventType.NODE_USABLE,
         nodesListManagerEvent.getType());
   }
+  
+  @Test
+  public void testResourceUpdateOnRunningNode() {
+    RMNodeImpl node = getRunningNode();
+    Resource oldCapacity = node.getTotalCapability();
+    assertEquals("Memory resource is not match.", oldCapacity.getMemory(), 4096);
+    assertEquals("CPU resource is not match.", oldCapacity.getVirtualCores(), 4);
+    node.handle(new RMNodeResourceUpdateEvent(node.getNodeID(),
+        ResourceOption.newInstance(Resource.newInstance(2048, 2), 
+            RMNode.OVER_COMMIT_TIMEOUT_MILLIS_DEFAULT)));
+    Resource newCapacity = node.getTotalCapability();
+    assertEquals("Memory resource is not match.", newCapacity.getMemory(), 2048);
+    assertEquals("CPU resource is not match.", newCapacity.getVirtualCores(), 2);
+    
+    Assert.assertEquals(NodeState.RUNNING, node.getState());
+    Assert.assertNotNull(nodesListManagerEvent);
+    Assert.assertEquals(NodesListManagerEventType.NODE_USABLE,
+        nodesListManagerEvent.getType());
+  }
+  
+  @Test
+  public void testResourceUpdateOnNewNode() {
+    RMNodeImpl node = getNewNode(Resource.newInstance(4096, 4));
+    Resource oldCapacity = node.getTotalCapability();
+    assertEquals("Memory resource is not match.", oldCapacity.getMemory(), 4096);
+    assertEquals("CPU resource is not match.", oldCapacity.getVirtualCores(), 4);
+    node.handle(new RMNodeResourceUpdateEvent(node.getNodeID(),
+        ResourceOption.newInstance(Resource.newInstance(2048, 2), 
+            RMNode.OVER_COMMIT_TIMEOUT_MILLIS_DEFAULT)));
+    Resource newCapacity = node.getTotalCapability();
+    assertEquals("Memory resource is not match.", newCapacity.getMemory(), 2048);
+    assertEquals("CPU resource is not match.", newCapacity.getVirtualCores(), 2);
+    
+    Assert.assertEquals(NodeState.NEW, node.getState());
+  }
+  
+  @Test
+  public void testResourceUpdateOnRebootedNode() {
+    RMNodeImpl node = getRebootedNode();
+    Resource oldCapacity = node.getTotalCapability();
+    assertEquals("Memory resource is not match.", oldCapacity.getMemory(), 4096);
+    assertEquals("CPU resource is not match.", oldCapacity.getVirtualCores(), 4);
+    node.handle(new RMNodeResourceUpdateEvent(node.getNodeID(),
+        ResourceOption.newInstance(Resource.newInstance(2048, 2), 
+            RMNode.OVER_COMMIT_TIMEOUT_MILLIS_DEFAULT)));
+    Resource newCapacity = node.getTotalCapability();
+    assertEquals("Memory resource is not match.", newCapacity.getMemory(), 2048);
+    assertEquals("CPU resource is not match.", newCapacity.getVirtualCores(), 2);
+    
+    Assert.assertEquals(NodeState.REBOOTED, node.getState());
+  }
 
   @Test
   public void testReconnnectUpdate() {

+ 12 - 7
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestNMReconnect.java

@@ -18,6 +18,9 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager.resourcetracker;
 
+import java.util.ArrayList;
+import java.util.List;
+
 import org.junit.Assert;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.api.records.NodeId;
@@ -47,14 +50,14 @@ public class TestNMReconnect {
   private static final RecordFactory recordFactory = 
       RecordFactoryProvider.getRecordFactory(null);
 
-  private RMNodeEvent rmNodeEvent = null;
+  private List<RMNodeEvent> rmNodeEvents = new ArrayList<RMNodeEvent>();
 
   private class TestRMNodeEventDispatcher implements
       EventHandler<RMNodeEvent> {
 
     @Override
     public void handle(RMNodeEvent event) {
-      rmNodeEvent = event;
+      rmNodeEvents.add(event);
     }
 
   }
@@ -109,16 +112,18 @@ public class TestNMReconnect {
     request1.setResource(capability);
     resourceTrackerService.registerNodeManager(request1);
 
-    Assert.assertEquals(RMNodeEventType.STARTED, rmNodeEvent.getType());
+    Assert.assertEquals(RMNodeEventType.STARTED, rmNodeEvents.get(0).getType());
 
-    rmNodeEvent = null;
+    rmNodeEvents.clear();
     resourceTrackerService.registerNodeManager(request1);
-    Assert.assertEquals(RMNodeEventType.RECONNECTED, rmNodeEvent.getType());
+    Assert.assertEquals(RMNodeEventType.RECONNECTED,
+        rmNodeEvents.get(0).getType());
 
-    rmNodeEvent = null;
+    rmNodeEvents.clear();
     resourceTrackerService.registerNodeManager(request1);
     capability = BuilderUtils.newResource(1024, 2);
     request1.setResource(capability);
-    Assert.assertEquals(RMNodeEventType.RECONNECTED, rmNodeEvent.getType());
+    Assert.assertEquals(RMNodeEventType.RECONNECTED,
+        rmNodeEvents.get(0).getType());
   }
 }

+ 106 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java

@@ -47,23 +47,30 @@ import org.apache.hadoop.security.token.TokenIdentifier;
 import org.apache.hadoop.yarn.LocalConfigurationProvider;
 import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
 import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.QueueInfo;
 import org.apache.hadoop.yarn.api.records.QueueState;
 import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
 import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceOption;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.AsyncDispatcher;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceRequest;
+import org.apache.hadoop.yarn.server.resourcemanager.AdminService;
 import org.apache.hadoop.yarn.server.resourcemanager.Application;
 import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
 import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
@@ -90,6 +97,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.TestSchedulerUtils;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
@@ -591,7 +599,6 @@ public class TestCapacityScheduler {
     return result;
   }
 
-  @SuppressWarnings("resource")
   @Test
   public void testBlackListNodes() throws Exception {
     Configuration conf = new Configuration();
@@ -627,6 +634,104 @@ public class TestCapacityScheduler {
     Assert.assertFalse(cs.getApplicationAttempt(appAttemptId).isBlacklisted(host));
     rm.stop();
   }
+  
+  @Test
+  public void testResourceOverCommit() throws Exception {
+    Configuration conf = new Configuration();
+    conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
+        ResourceScheduler.class);
+    MockRM rm = new MockRM(conf);
+    rm.start();
+    
+    MockNM nm1 = rm.registerNode("127.0.0.1:1234", 4 * GB);
+    RMApp app1 = rm.submitApp(2048);
+    // kick the scheduling, 2 GB given to AM1, remaining 2GB on nm1
+    nm1.nodeHeartbeat(true);
+    RMAppAttempt attempt1 = app1.getCurrentAppAttempt();
+    MockAM am1 = rm.sendAMLaunched(attempt1.getAppAttemptId());
+    am1.registerAppAttempt();
+    SchedulerNodeReport report_nm1 = rm.getResourceScheduler().getNodeReport(
+        nm1.getNodeId());
+    // check node report, 2 GB used and 2 GB available
+    Assert.assertEquals(2 * GB, report_nm1.getUsedResource().getMemory());
+    Assert.assertEquals(2 * GB, report_nm1.getAvailableResource().getMemory());
+
+    // add request for containers
+    am1.addRequests(new String[] { "127.0.0.1", "127.0.0.2" }, 2 * GB, 1, 1);
+    AllocateResponse alloc1Response = am1.schedule(); // send the request
+
+    // kick the scheduler, 2 GB given to AM1, resource remaining 0
+    nm1.nodeHeartbeat(true);
+    while (alloc1Response.getAllocatedContainers().size() < 1) {
+      LOG.info("Waiting for containers to be created for app 1...");
+      Thread.sleep(100);
+      alloc1Response = am1.schedule();
+    }
+
+    List<Container> allocated1 = alloc1Response.getAllocatedContainers();
+    Assert.assertEquals(1, allocated1.size());
+    Assert.assertEquals(2 * GB, allocated1.get(0).getResource().getMemory());
+    Assert.assertEquals(nm1.getNodeId(), allocated1.get(0).getNodeId());
+    
+    report_nm1 = rm.getResourceScheduler().getNodeReport(nm1.getNodeId());
+    // check node report, 4 GB used and 0 GB available
+    Assert.assertEquals(0, report_nm1.getAvailableResource().getMemory());
+    Assert.assertEquals(4 * GB, report_nm1.getUsedResource().getMemory());
+
+    // check container is assigned with 2 GB.
+    Container c1 = allocated1.get(0);
+    Assert.assertEquals(2 * GB, c1.getResource().getMemory());
+    
+    // update node resource to 2 GB, so resource is over-consumed.
+    Map<NodeId, ResourceOption> nodeResourceMap = 
+        new HashMap<NodeId, ResourceOption>();
+    nodeResourceMap.put(nm1.getNodeId(), 
+        ResourceOption.newInstance(Resource.newInstance(2 * GB, 1), -1));
+    UpdateNodeResourceRequest request = 
+        UpdateNodeResourceRequest.newInstance(nodeResourceMap);
+    AdminService as = ((MockRM)rm).getAdminService();
+    as.updateNodeResource(request);
+    
+    // Now, the used resource is still 4 GB, and available resource is minus value.
+    report_nm1 = rm.getResourceScheduler().getNodeReport(nm1.getNodeId());
+    Assert.assertEquals(4 * GB, report_nm1.getUsedResource().getMemory());
+    Assert.assertEquals(-2 * GB, report_nm1.getAvailableResource().getMemory());
+    
+    // Check container can complete successfully in case of resource over-commitment.
+    ContainerStatus containerStatus = BuilderUtils.newContainerStatus(
+        c1.getId(), ContainerState.COMPLETE, "", 0);
+    nm1.containerStatus(containerStatus);
+    int waitCount = 0;
+    while (attempt1.getJustFinishedContainers().size() < 1
+        && waitCount++ != 20) {
+      LOG.info("Waiting for containers to be finished for app 1... Tried "
+          + waitCount + " times already..");
+      Thread.sleep(100);
+    }
+    Assert.assertEquals(1, attempt1.getJustFinishedContainers().size());
+    Assert.assertEquals(1, am1.schedule().getCompletedContainersStatuses().size());
+    report_nm1 = rm.getResourceScheduler().getNodeReport(nm1.getNodeId());
+    Assert.assertEquals(2 * GB, report_nm1.getUsedResource().getMemory());
+    // As container return 2 GB back, the available resource becomes 0 again.
+    Assert.assertEquals(0 * GB, report_nm1.getAvailableResource().getMemory());
+    
+    // Verify no NPE is trigger in schedule after resource is updated.
+    am1.addRequests(new String[] { "127.0.0.1", "127.0.0.2" }, 3 * GB, 1, 1);
+    alloc1Response = am1.schedule();
+    Assert.assertEquals("Shouldn't have enough resource to allocate containers",
+        0, alloc1Response.getAllocatedContainers().size());
+    int times = 0;
+    // try 10 times as scheduling is async process.
+    while (alloc1Response.getAllocatedContainers().size() < 1
+        && times++ < 10) {
+      LOG.info("Waiting for containers to be allocated for app 1... Tried "
+          + times + " times already..");
+      Thread.sleep(100);
+    }
+    Assert.assertEquals("Shouldn't have enough resource to allocate containers",
+        0, alloc1Response.getAllocatedContainers().size());
+    rm.stop();
+  }
 
     @Test (timeout = 5000)
     public void testApplicationComparator()

+ 12 - 12
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java

@@ -68,6 +68,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaS
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeResourceUpdateSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
@@ -278,17 +279,16 @@ public class TestFifoScheduler {
         (Map<NodeId, FiCaSchedulerNode>) method.invoke(scheduler);
     assertEquals(schedulerNodes.values().size(), 1);
     
-    // set resource of RMNode to 1024 and verify it works.
-    node0.setResourceOption(ResourceOption.newInstance(
-        Resources.createResource(1024, 4), RMNode.OVER_COMMIT_TIMEOUT_MILLIS_DEFAULT));
-    assertEquals(node0.getTotalCapability().getMemory(), 1024);
-    // verify that SchedulerNode's resource hasn't been changed.
-    assertEquals(schedulerNodes.get(node0.getNodeID()).
-        getAvailableResource().getMemory(), 2048);
-    // now, NM heartbeat comes.
-    NodeUpdateSchedulerEvent node0Update = new NodeUpdateSchedulerEvent(node0);
-    scheduler.handle(node0Update);
-    // SchedulerNode's available resource is changed.
+    Resource newResource = Resources.createResource(1024, 4);
+    
+    NodeResourceUpdateSchedulerEvent node0ResourceUpdate = new 
+        NodeResourceUpdateSchedulerEvent(node0, ResourceOption.newInstance(
+            newResource, RMNode.OVER_COMMIT_TIMEOUT_MILLIS_DEFAULT));
+    scheduler.handle(node0ResourceUpdate);
+    
+    // SchedulerNode's total resource and available resource are changed.
+    assertEquals(schedulerNodes.get(node0.getNodeID()).getTotalResource()
+        .getMemory(), 1024);
     assertEquals(schedulerNodes.get(node0.getNodeID()).
         getAvailableResource().getMemory(), 1024);
     QueueInfo queueInfo = scheduler.getQueueInfo(null, false, false);
@@ -324,6 +324,7 @@ public class TestFifoScheduler {
     // Before the node update event, there are one local request
     Assert.assertEquals(1, nodeLocal.getNumContainers());
 
+    NodeUpdateSchedulerEvent node0Update = new NodeUpdateSchedulerEvent(node0);
     // Now schedule.
     scheduler.handle(node0Update);
 
@@ -544,7 +545,6 @@ public class TestFifoScheduler {
     LOG.info("--- END: testFifoScheduler ---");
   }
 
-  @SuppressWarnings("resource")
   @Test
   public void testBlackListNodes() throws Exception {
     Configuration conf = new Configuration();