Browse Source

YARN-6979. Add flag to notify all types of container updates to NM via NodeHeartbeatResponse. (Kartheek Muthyala via asuresh)

Arun Suresh 7 years ago
parent
commit
8410d862d3
23 changed files with 317 additions and 97 deletions
  1. 1 1
      hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java
  2. 1 1
      hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java
  3. 4 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
  4. 8 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
  5. 3 3
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java
  6. 21 21
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java
  7. 3 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto
  8. 3 3
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestYarnServerApiClasses.java
  9. 1 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
  10. 20 15
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
  11. 1 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java
  12. 1 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
  13. 3 5
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java
  14. 3 3
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java
  15. 1 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeEventType.java
  16. 16 13
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
  17. 15 10
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeUpdateContainerEvent.java
  18. 11 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
  19. 28 11
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
  20. 1 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java
  21. 168 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestOpportunisticContainerAllocatorAMService.java
  22. 2 5
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerResizing.java
  23. 2 2
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestIncreaseAllocationExpirer.java

+ 1 - 1
hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java

@@ -179,7 +179,7 @@ public class NodeInfo {
     }
 
     @Override
-    public void updateNodeHeartbeatResponseForContainersDecreasing(
+    public void updateNodeHeartbeatResponseForUpdatedContainers(
         NodeHeartbeatResponse response) {
       // TODO Auto-generated method stub
       

+ 1 - 1
hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java

@@ -168,7 +168,7 @@ public class RMNodeWrapper implements RMNode {
   }
 
   @Override
-  public void updateNodeHeartbeatResponseForContainersDecreasing(
+  public void updateNodeHeartbeatResponseForUpdatedContainers(
       NodeHeartbeatResponse response) {
     // TODO Auto-generated method stub
   }

+ 4 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java

@@ -167,6 +167,10 @@ public class YarnConfiguration extends Configuration {
   public static final String RM_APPLICATION_MASTER_SERVICE_PROCESSORS =
       RM_PREFIX + "application-master-service.processors";
 
+  public static final String RM_AUTO_UPDATE_CONTAINERS =
+      RM_PREFIX + "auto-update.containers";
+  public static final boolean DEFAULT_RM_AUTO_UPDATE_CONTAINERS = false;
+
   /** The actual bind address for the RM.*/
   public static final String RM_BIND_HOST =
     RM_PREFIX + "bind-host";

+ 8 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml

@@ -72,6 +72,14 @@
     <value></value>
   </property>
 
+  <property>
+    <description>
+      If set to true, then ALL container updates will be automatically sent to
+      the NM in the next heartbeat</description>
+    <name>yarn.resourcemanager.auto-update.containers</name>
+    <value>false</value>
+  </property>
+
   <property>
     <description>The number of threads used to handle applications manager requests.</description>
     <name>yarn.resourcemanager.client.thread-count</name>

+ 3 - 3
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java

@@ -104,10 +104,10 @@ public abstract class NodeHeartbeatResponse {
 
   public abstract void setResource(Resource resource);
 
-  public abstract List<Container> getContainersToDecrease();
+  public abstract List<Container> getContainersToUpdate();
 
-  public abstract void addAllContainersToDecrease(
-      Collection<Container> containersToDecrease);
+  public abstract void addAllContainersToUpdate(
+      Collection<Container> containersToUpdate);
 
   public abstract ContainerQueuingLimit getContainerQueuingLimit();
 

+ 21 - 21
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java

@@ -75,7 +75,7 @@ public class NodeHeartbeatResponsePBImpl extends NodeHeartbeatResponse {
   private MasterKey containerTokenMasterKey = null;
   private MasterKey nmTokenMasterKey = null;
   private ContainerQueuingLimit containerQueuingLimit = null;
-  private List<Container> containersToDecrease = null;
+  private List<Container> containersToUpdate = null;
   private List<SignalContainerRequest> containersToSignal = null;
 
   public NodeHeartbeatResponsePBImpl() {
@@ -119,8 +119,8 @@ public class NodeHeartbeatResponsePBImpl extends NodeHeartbeatResponse {
     if (this.systemCredentials != null) {
       addSystemCredentialsToProto();
     }
-    if (this.containersToDecrease != null) {
-      addContainersToDecreaseToProto();
+    if (this.containersToUpdate != null) {
+      addContainersToUpdateToProto();
     }
     if (this.containersToSignal != null) {
       addContainersToSignalToProto();
@@ -499,39 +499,39 @@ public class NodeHeartbeatResponsePBImpl extends NodeHeartbeatResponse {
     builder.addAllApplicationsToCleanup(iterable);
   }
 
-  private void initContainersToDecrease() {
-    if (this.containersToDecrease != null) {
+  private void initContainersToUpdate() {
+    if (this.containersToUpdate != null) {
       return;
     }
     NodeHeartbeatResponseProtoOrBuilder p = viaProto ? proto : builder;
-    List<ContainerProto> list = p.getContainersToDecreaseList();
-    this.containersToDecrease = new ArrayList<>();
+    List<ContainerProto> list = p.getContainersToUpdateList();
+    this.containersToUpdate = new ArrayList<>();
 
     for (ContainerProto c : list) {
-      this.containersToDecrease.add(convertFromProtoFormat(c));
+      this.containersToUpdate.add(convertFromProtoFormat(c));
     }
   }
 
   @Override
-  public List<Container> getContainersToDecrease() {
-    initContainersToDecrease();
-    return this.containersToDecrease;
+  public List<Container> getContainersToUpdate() {
+    initContainersToUpdate();
+    return this.containersToUpdate;
   }
 
   @Override
-  public void addAllContainersToDecrease(
-      final Collection<Container> containersToDecrease) {
-    if (containersToDecrease == null) {
+  public void addAllContainersToUpdate(
+      final Collection<Container> containersToBeUpdated) {
+    if (containersToBeUpdated == null) {
       return;
     }
-    initContainersToDecrease();
-    this.containersToDecrease.addAll(containersToDecrease);
+    initContainersToUpdate();
+    this.containersToUpdate.addAll(containersToBeUpdated);
   }
 
-  private void addContainersToDecreaseToProto() {
+  private void addContainersToUpdateToProto() {
     maybeInitBuilder();
-    builder.clearContainersToDecrease();
-    if (this.containersToDecrease == null) {
+    builder.clearContainersToUpdate();
+    if (this.containersToUpdate == null) {
       return;
     }
     Iterable<ContainerProto> iterable = new
@@ -539,7 +539,7 @@ public class NodeHeartbeatResponsePBImpl extends NodeHeartbeatResponse {
       @Override
       public Iterator<ContainerProto> iterator() {
         return new Iterator<ContainerProto>() {
-          private Iterator<Container> iter = containersToDecrease.iterator();
+          private Iterator<Container> iter = containersToUpdate.iterator();
           @Override
           public boolean hasNext() {
             return iter.hasNext();
@@ -555,7 +555,7 @@ public class NodeHeartbeatResponsePBImpl extends NodeHeartbeatResponse {
         };
       }
     };
-    builder.addAllContainersToDecrease(iterable);
+    builder.addAllContainersToUpdate(iterable);
   }
 
   @Override

+ 3 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto

@@ -111,11 +111,14 @@ message NodeHeartbeatResponseProto {
   repeated ContainerIdProto containers_to_be_removed_from_nm = 9;
   repeated SystemCredentialsForAppsProto system_credentials_for_apps = 10;
   optional bool areNodeLabelsAcceptedByRM = 11 [default = false];
+  // to be deprecated in favour of containers_to_update
   repeated ContainerProto containers_to_decrease = 12;
   repeated SignalContainerRequestProto containers_to_signal = 13;
   optional ResourceProto resource = 14;
   optional ContainerQueuingLimitProto container_queuing_limit = 15;
   repeated AppCollectorsMapProto app_collectors_map = 16;
+  // to be used in place of containers_to_decrease
+  repeated ContainerProto containers_to_update = 17;
 }
 
 message ContainerQueuingLimitProto {

+ 3 - 3
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestYarnServerApiClasses.java

@@ -180,14 +180,14 @@ public class TestYarnServerApiClasses {
   @Test
   public void testNodeHeartbeatResponsePBImplWithDecreasedContainers() {
     NodeHeartbeatResponsePBImpl original = new NodeHeartbeatResponsePBImpl();
-    original.addAllContainersToDecrease(
+    original.addAllContainersToUpdate(
         Arrays.asList(getDecreasedContainer(1, 2, 2048, 2),
             getDecreasedContainer(2, 3, 1024, 1)));
     NodeHeartbeatResponsePBImpl copy =
         new NodeHeartbeatResponsePBImpl(original.getProto());
-    assertEquals(1, copy.getContainersToDecrease().get(0)
+    assertEquals(1, copy.getContainersToUpdate().get(0)
         .getId().getContainerId());
-    assertEquals(1024, copy.getContainersToDecrease().get(1)
+    assertEquals(1024, copy.getContainersToUpdate().get(1)
         .getResource().getMemorySize());
   }
 

+ 1 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java

@@ -1099,7 +1099,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
                   parseCredentials(systemCredentials));
             }
             List<org.apache.hadoop.yarn.api.records.Container>
-                containersToDecrease = response.getContainersToDecrease();
+                containersToDecrease = response.getContainersToUpdate();
             if (!containersToDecrease.isEmpty()) {
               dispatcher.getEventHandler().handle(
                   new CMgrDecreaseContainersResourceEvent(

+ 20 - 15
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java

@@ -166,6 +166,7 @@ import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -1241,10 +1242,19 @@ public class ContainerManagerImpl extends CompositeService implements
     org.apache.hadoop.yarn.server.nodemanager.
         containermanager.container.ContainerState currentState =
         container.getContainerState();
-    if (currentState != org.apache.hadoop.yarn.server.
-            nodemanager.containermanager.container.ContainerState.RUNNING &&
-        currentState != org.apache.hadoop.yarn.server.
-            nodemanager.containermanager.container.ContainerState.SCHEDULED) {
+    EnumSet<org.apache.hadoop.yarn.server.nodemanager.containermanager
+        .container.ContainerState> allowedStates = EnumSet.of(
+        org.apache.hadoop.yarn.server.nodemanager.containermanager.container
+            .ContainerState.RUNNING,
+        org.apache.hadoop.yarn.server.nodemanager.containermanager.container
+            .ContainerState.SCHEDULED,
+        org.apache.hadoop.yarn.server.nodemanager.containermanager.container
+            .ContainerState.LOCALIZING,
+        org.apache.hadoop.yarn.server.nodemanager.containermanager.container
+            .ContainerState.REINITIALIZING,
+        org.apache.hadoop.yarn.server.nodemanager.containermanager.container
+            .ContainerState.RELAUNCHING);
+    if (!allowedStates.contains(currentState)) {
       throw RPCUtil.getRemoteException("Container " + containerId.toString()
           + " is in " + currentState.name() + " state."
           + " Resource can only be changed when a container is in"
@@ -1279,17 +1289,12 @@ public class ContainerManagerImpl extends CompositeService implements
             org.apache.hadoop.yarn.api.records.Container.newInstance(
                 containerId, null, null, targetResource, null, null,
                 currentExecType);
-      } else {
-        increasedContainer =
-            org.apache.hadoop.yarn.api.records.Container.newInstance(
-                containerId, null, null, currentResource, null, null,
-                targetExecType);
-      }
-      if (context.getIncreasedContainers().putIfAbsent(containerId,
-          increasedContainer) != null){
-        throw RPCUtil.getRemoteException("Container " + containerId.toString()
-            + " resource is being increased -or- " +
-            "is undergoing ExecutionType promoted.");
+        if (context.getIncreasedContainers().putIfAbsent(containerId,
+            increasedContainer) != null){
+          throw RPCUtil.getRemoteException("Container " + containerId.toString()
+              + " resource is being increased -or- " +
+              "is undergoing ExecutionType promoted.");
+        }
       }
     }
     this.readLock.lock();

+ 1 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java

@@ -173,6 +173,7 @@ public class ContainerScheduler extends AbstractService implements
             new ChangeMonitoringContainerResourceEvent(containerId,
                 updateEvent.getUpdatedToken().getResource()));
       } else {
+        // Is Queued or localizing..
         updateEvent.getContainer().setContainerTokenIdentifier(
             updateEvent.getUpdatedToken());
       }

+ 1 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java

@@ -551,7 +551,7 @@ public class ResourceTrackerService extends AbstractService implements
             getResponseId() + 1, NodeAction.NORMAL, null, null, null, null,
             nextHeartBeatInterval);
     rmNode.updateNodeHeartbeatResponseForCleanup(nodeHeartBeatResponse);
-    rmNode.updateNodeHeartbeatResponseForContainersDecreasing(
+    rmNode.updateNodeHeartbeatResponseForUpdatedContainers(
         nodeHeartBeatResponse);
 
     populateKeys(request, nodeHeartBeatResponse);

+ 3 - 5
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java

@@ -51,8 +51,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptE
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerFinishedEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.rmnode
-    .RMNodeDecreaseContainerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeUpdateContainerEvent;
 import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
 import org.apache.hadoop.yarn.state.InvalidStateTransitionException;
 import org.apache.hadoop.yarn.state.MultipleArcTransition;
@@ -284,7 +283,6 @@ public class RMContainerImpl implements RMContainer {
   @Override
   public RMContainerState getState() {
     this.readLock.lock();
-
     try {
       return this.stateMachine.getCurrentState();
     } finally {
@@ -598,7 +596,7 @@ public class RMContainerImpl implements RMContainer {
       RMContainerUpdatesAcquiredEvent acquiredEvent =
           (RMContainerUpdatesAcquiredEvent) event;
       if (acquiredEvent.isIncreasedContainer()) {
-        // If container is increased but not acquired by AM, we will start
+        // If container is increased but not started by AM, we will start
         // containerAllocationExpirer for this container in this transition. 
         container.containerAllocationExpirer.register(
             new AllocationExpirationInfo(event.getContainerId(), true));
@@ -641,7 +639,7 @@ public class RMContainerImpl implements RMContainer {
         container.lastConfirmedResource = rmContainerResource;
         container.containerAllocationExpirer.unregister(
             new AllocationExpirationInfo(event.getContainerId()));
-        container.eventHandler.handle(new RMNodeDecreaseContainerEvent(
+        container.eventHandler.handle(new RMNodeUpdateContainerEvent(
             container.nodeId,
             Collections.singletonList(container.getContainer())));
       } else if (Resources.fitsIn(nmContainerResource, rmContainerResource)) {

+ 3 - 3
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java

@@ -144,7 +144,7 @@ public interface RMNode {
    * applications to clean up for this node.
    * @param response the {@link NodeHeartbeatResponse} to update
    */
-  public void updateNodeHeartbeatResponseForCleanup(NodeHeartbeatResponse response);
+  void updateNodeHeartbeatResponseForCleanup(NodeHeartbeatResponse response);
 
   public NodeHeartbeatResponse getLastNodeHeartBeatResponse();
 
@@ -169,9 +169,9 @@ public interface RMNode {
   public Set<String> getNodeLabels();
   
   /**
-   * Update containers to be decreased
+   * Update containers to be updated
    */
-  public void updateNodeHeartbeatResponseForContainersDecreasing(
+  void updateNodeHeartbeatResponseForUpdatedContainers(
       NodeHeartbeatResponse response);
   
   public List<Container> pullNewlyIncreasedContainers();

+ 1 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeEventType.java

@@ -42,7 +42,7 @@ public enum RMNodeEventType {
   // Source: Container
   CONTAINER_ALLOCATED,
   CLEANUP_CONTAINER,
-  DECREASE_CONTAINER,
+  UPDATE_CONTAINER,
 
   // Source: ClientRMService
   SIGNAL_CONTAINER,

+ 16 - 13
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java

@@ -171,7 +171,7 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
   private final List<ApplicationId> runningApplications =
       new ArrayList<ApplicationId>();
   
-  private final Map<ContainerId, Container> toBeDecreasedContainers =
+  private final Map<ContainerId, Container> toBeUpdatedContainers =
       new HashMap<>();
   
   private final Map<ContainerId, Container> nmReportedIncreasedContainers =
@@ -228,8 +228,8 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
       .addTransition(NodeState.RUNNING, NodeState.RUNNING,
           RMNodeEventType.RESOURCE_UPDATE, new UpdateNodeResourceWhenRunningTransition())
       .addTransition(NodeState.RUNNING, NodeState.RUNNING,
-          RMNodeEventType.DECREASE_CONTAINER,
-          new DecreaseContainersTransition())
+          RMNodeEventType.UPDATE_CONTAINER,
+          new UpdateContainersTransition())
       .addTransition(NodeState.RUNNING, NodeState.RUNNING,
           RMNodeEventType.SIGNAL_CONTAINER, new SignalContainerTransition())
       .addTransition(NodeState.RUNNING, NodeState.SHUTDOWN,
@@ -614,18 +614,18 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
   };
   
   @VisibleForTesting
-  public Collection<Container> getToBeDecreasedContainers() {
-    return toBeDecreasedContainers.values(); 
+  public Collection<Container> getToBeUpdatedContainers() {
+    return toBeUpdatedContainers.values();
   }
   
   @Override
-  public void updateNodeHeartbeatResponseForContainersDecreasing(
+  public void updateNodeHeartbeatResponseForUpdatedContainers(
       NodeHeartbeatResponse response) {
     this.writeLock.lock();
     
     try {
-      response.addAllContainersToDecrease(toBeDecreasedContainers.values());
-      toBeDecreasedContainers.clear();
+      response.addAllContainersToUpdate(toBeUpdatedContainers.values());
+      toBeUpdatedContainers.clear();
     } finally {
       this.writeLock.unlock();
     }
@@ -1031,16 +1031,19 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
           RMNodeFinishedContainersPulledByAMEvent) event).getContainers());
     }
   }
-  
-  public static class DecreaseContainersTransition
+
+  /**
+   * Transition to Update a container.
+   */
+  public static class UpdateContainersTransition
       implements SingleArcTransition<RMNodeImpl, RMNodeEvent> {
  
     @Override
     public void transition(RMNodeImpl rmNode, RMNodeEvent event) {
-      RMNodeDecreaseContainerEvent de = (RMNodeDecreaseContainerEvent) event;
+      RMNodeUpdateContainerEvent de = (RMNodeUpdateContainerEvent) event;
 
-      for (Container c : de.getToBeDecreasedContainers()) {
-        rmNode.toBeDecreasedContainers.put(c.getId(), c);
+      for (Container c : de.getToBeUpdatedContainers()) {
+        rmNode.toBeUpdatedContainers.put(c.getId(), c);
       }
     }
   }

+ 15 - 10
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeDecreaseContainerEvent.java → hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeUpdateContainerEvent.java

@@ -23,17 +23,22 @@ import java.util.List;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.NodeId;
 
-public class RMNodeDecreaseContainerEvent extends RMNodeEvent {
-  final List<Container> toBeDecreasedContainers;
+/**
+ * This class is used to create update container event
+ * for the containers running on a node.
+ *
+ */
+public class RMNodeUpdateContainerEvent extends RMNodeEvent {
+  private List<Container> toBeUpdatedContainers;
+
+  public RMNodeUpdateContainerEvent(NodeId nodeId,
+      List<Container> toBeUpdatedContainers) {
+    super(nodeId, RMNodeEventType.UPDATE_CONTAINER);
 
-  public RMNodeDecreaseContainerEvent(NodeId nodeId,
-      List<Container> toBeDecreasedContainers) {
-    super(nodeId, RMNodeEventType.DECREASE_CONTAINER);
-    
-    this.toBeDecreasedContainers = toBeDecreasedContainers;
+    this.toBeUpdatedContainers = toBeUpdatedContainers;
   }
-  
-  public List<Container> getToBeDecreasedContainers() {
-    return toBeDecreasedContainers;
+
+  public List<Container> getToBeUpdatedContainers() {
+    return toBeUpdatedContainers;
   }
 }

+ 11 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java

@@ -150,6 +150,10 @@ public abstract class AbstractYarnScheduler
    */
   protected final ReentrantReadWriteLock.WriteLock writeLock;
 
+  // If set to true, then ALL container updates will be automatically sent to
+  // the NM in the next heartbeat.
+  private boolean autoUpdateContainers = false;
+
   /**
    * Construct the service.
    *
@@ -178,6 +182,9 @@ public abstract class AbstractYarnScheduler
         configuredMaximumAllocationWaitTime);
     maxClusterLevelAppPriority = getMaxPriorityFromConf(conf);
     createReleaseCache();
+    autoUpdateContainers =
+        conf.getBoolean(YarnConfiguration.RM_AUTO_UPDATE_CONTAINERS,
+            YarnConfiguration.DEFAULT_RM_AUTO_UPDATE_CONTAINERS);
     super.serviceInit(conf);
   }
 
@@ -235,6 +242,10 @@ public abstract class AbstractYarnScheduler
     return nodeTracker.getNodes(nodeFilter);
   }
 
+  public boolean shouldContainersBeAutoUpdated() {
+    return this.autoUpdateContainers;
+  }
+
   @Override
   public Resource getClusterResource() {
     return nodeTracker.getClusterCapacity();

+ 28 - 11
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java

@@ -74,8 +74,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerStat
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerUpdatesAcquiredEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent;
 
-import org.apache.hadoop.yarn.server.resourcemanager.rmnode
-    .RMNodeDecreaseContainerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeUpdateContainerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SchedulingPlacementSet;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.SchedulableEntity;
@@ -663,20 +662,38 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
           + " an updated container " + container.getId(), e);
       return null;
     }
-    
-    if (updateType == null ||
-        ContainerUpdateType.PROMOTE_EXECUTION_TYPE == updateType ||
-        ContainerUpdateType.DEMOTE_EXECUTION_TYPE == updateType) {
+
+    if (updateType == null) {
+      // This is a newly allocated container
       rmContainer.handle(new RMContainerEvent(
           rmContainer.getContainerId(), RMContainerEventType.ACQUIRED));
     } else {
-      rmContainer.handle(new RMContainerUpdatesAcquiredEvent(
-          rmContainer.getContainerId(),
-          ContainerUpdateType.INCREASE_RESOURCE == updateType));
-      if (ContainerUpdateType.DECREASE_RESOURCE == updateType) {
+      // Resource increase is handled as follows:
+      // If the AM does not use the updated token to increase the container
+      // for a configured period of time, the RM will automatically rollback
+      // the update by performing a container decrease. This rollback (which
+      // essentially is another resource decrease update) is notified to the
+      // NM heartbeat response. If autoUpdate flag is set, then AM does not
+      // need to do anything - same code path as resource decrease.
+      //
+      // Resource Decrease is always automatic: the AM never has to do
+      // anything. It is always via NM heartbeat response.
+      //
+      // ExecutionType updates (both Promotion and Demotion) are either
+      // always automatic (if the flag is set) or the AM has to explicitly
+      // call updateContainer() on the NM. There is no expiry
+      boolean autoUpdate =
+          ContainerUpdateType.DECREASE_RESOURCE == updateType ||
+              ((AbstractYarnScheduler)rmContext.getScheduler())
+                  .shouldContainersBeAutoUpdated();
+      if (autoUpdate) {
         this.rmContext.getDispatcher().getEventHandler().handle(
-            new RMNodeDecreaseContainerEvent(rmContainer.getNodeId(),
+            new RMNodeUpdateContainerEvent(rmContainer.getNodeId(),
                 Collections.singletonList(rmContainer.getContainer())));
+      } else {
+        rmContainer.handle(new RMContainerUpdatesAcquiredEvent(
+            rmContainer.getContainerId(),
+            ContainerUpdateType.INCREASE_RESOURCE == updateType));
       }
     }
     return container;

+ 1 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java

@@ -242,7 +242,7 @@ public class MockNodes {
     }
 
     @Override
-    public void updateNodeHeartbeatResponseForContainersDecreasing(
+    public void updateNodeHeartbeatResponseForUpdatedContainers(
         NodeHeartbeatResponse response) {
       
     }

+ 168 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestOpportunisticContainerAllocatorAMService.java

@@ -43,11 +43,13 @@ import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.api.records.UpdateContainerRequest;
+import org.apache.hadoop.yarn.api.records.UpdatedContainer;
 import org.apache.hadoop.yarn.server.api.DistributedSchedulingAMProtocolPB;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
 import org.apache.hadoop.yarn.server.api.protocolrecords.DistributedSchedulingAllocateRequest;
 import org.apache.hadoop.yarn.server.api.protocolrecords.DistributedSchedulingAllocateResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
 import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterDistributedSchedulingAMResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
@@ -122,6 +124,21 @@ public class TestOpportunisticContainerAllocatorAMService {
     rm.start();
   }
 
+  public void createAndStartRMWithAutoUpdateContainer() {
+    CapacitySchedulerConfiguration csConf =
+        new CapacitySchedulerConfiguration();
+    YarnConfiguration conf = new YarnConfiguration(csConf);
+    conf.setBoolean(YarnConfiguration.RM_AUTO_UPDATE_CONTAINERS, true);
+    conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
+        ResourceScheduler.class);
+    conf.setBoolean(
+        YarnConfiguration.OPPORTUNISTIC_CONTAINER_ALLOCATION_ENABLED, true);
+    conf.setInt(
+        YarnConfiguration.NM_CONTAINER_QUEUING_SORTING_NODES_INTERVAL_MS, 100);
+    rm = new MockRM(conf);
+    rm.start();
+  }
+
   @After
   public void stopRM() {
     if (rm != null) {
@@ -548,6 +565,157 @@ public class TestOpportunisticContainerAllocatorAMService {
     verifyMetrics(metrics, 7168, 7, 1024, 1, 1);
   }
 
+  @Test(timeout = 600000)
+  public void testContainerAutoUpdateContainer() throws Exception {
+    rm.stop();
+    createAndStartRMWithAutoUpdateContainer();
+    MockNM nm1 = new MockNM("h1:1234", 4096, rm.getResourceTrackerService());
+    nm1.registerNode();
+
+    OpportunisticContainerAllocatorAMService amservice =
+        (OpportunisticContainerAllocatorAMService) rm
+            .getApplicationMasterService();
+    RMApp app1 = rm.submitApp(1 * GB, "app", "user", null, "default");
+    ApplicationAttemptId attemptId =
+        app1.getCurrentAppAttempt().getAppAttemptId();
+    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm1);
+    ResourceScheduler scheduler = rm.getResourceScheduler();
+    RMNode rmNode1 = rm.getRMContext().getRMNodes().get(nm1.getNodeId());
+
+    nm1.nodeHeartbeat(true);
+
+    ((RMNodeImpl) rmNode1)
+        .setOpportunisticContainersStatus(getOppurtunisticStatus(-1, 100));
+
+    OpportunisticContainerContext ctxt =
+        ((CapacityScheduler) scheduler).getApplicationAttempt(attemptId)
+            .getOpportunisticContainerContext();
+    // Send add and update node events to AM Service.
+    amservice.handle(new NodeAddedSchedulerEvent(rmNode1));
+    amservice.handle(new NodeUpdateSchedulerEvent(rmNode1));
+
+    nm1.nodeHeartbeat(true);
+    Thread.sleep(1000);
+
+    AllocateResponse allocateResponse = am1.allocate(Arrays.asList(
+        ResourceRequest.newInstance(Priority.newInstance(1), "*",
+            Resources.createResource(1 * GB), 2, true, null,
+            ExecutionTypeRequest
+                .newInstance(ExecutionType.OPPORTUNISTIC, true))), null);
+    List<Container> allocatedContainers =
+        allocateResponse.getAllocatedContainers();
+    Assert.assertEquals(2, allocatedContainers.size());
+    Container container = allocatedContainers.get(0);
+    // Start Container in NM
+    nm1.nodeHeartbeat(Arrays.asList(ContainerStatus
+        .newInstance(container.getId(), ExecutionType.OPPORTUNISTIC,
+            ContainerState.RUNNING, "", 0)), true);
+    Thread.sleep(200);
+
+    // Verify that container is actually running wrt the RM..
+    RMContainer rmContainer = ((CapacityScheduler) scheduler)
+        .getApplicationAttempt(container.getId().getApplicationAttemptId())
+        .getRMContainer(container.getId());
+    Assert.assertEquals(RMContainerState.RUNNING, rmContainer.getState());
+
+    // Send Promotion req... this should result in update error
+    // Since the container doesn't exist anymore..
+    allocateResponse = am1.sendContainerUpdateRequest(Arrays.asList(
+        UpdateContainerRequest.newInstance(0, container.getId(),
+            ContainerUpdateType.PROMOTE_EXECUTION_TYPE, null,
+            ExecutionType.GUARANTEED)));
+
+    nm1.nodeHeartbeat(Arrays.asList(ContainerStatus
+        .newInstance(container.getId(), ExecutionType.OPPORTUNISTIC,
+            ContainerState.RUNNING, "", 0)), true);
+    Thread.sleep(200);
+    // Get the update response on next allocate
+    allocateResponse = am1.allocate(new ArrayList<>(), new ArrayList<>());
+    // Check the update response from YARNRM
+    Assert.assertEquals(1, allocateResponse.getUpdatedContainers().size());
+    UpdatedContainer uc = allocateResponse.getUpdatedContainers().get(0);
+    Assert.assertEquals(container.getId(), uc.getContainer().getId());
+    Assert.assertEquals(ExecutionType.GUARANTEED,
+        uc.getContainer().getExecutionType());
+    // Check that the container is updated in NM through NM heartbeat response
+    NodeHeartbeatResponse response = nm1.nodeHeartbeat(true);
+    Assert.assertEquals(1, response.getContainersToUpdate().size());
+    Container containersFromNM = response.getContainersToUpdate().get(0);
+    Assert.assertEquals(container.getId(), containersFromNM.getId());
+    Assert.assertEquals(ExecutionType.GUARANTEED,
+        containersFromNM.getExecutionType());
+
+    //Increase resources
+    allocateResponse = am1.sendContainerUpdateRequest(Arrays.asList(
+        UpdateContainerRequest.newInstance(1, container.getId(),
+            ContainerUpdateType.INCREASE_RESOURCE,
+            Resources.createResource(2 * GB, 1), null)));
+    response = nm1.nodeHeartbeat(Arrays.asList(ContainerStatus
+        .newInstance(container.getId(), ExecutionType.GUARANTEED,
+            ContainerState.RUNNING, "", 0)), true);
+
+    Thread.sleep(200);
+    if (allocateResponse.getUpdatedContainers().size() == 0) {
+      allocateResponse = am1.allocate(new ArrayList<>(), new ArrayList<>());
+    }
+    Assert.assertEquals(1, allocateResponse.getUpdatedContainers().size());
+    uc = allocateResponse.getUpdatedContainers().get(0);
+    Assert.assertEquals(container.getId(), uc.getContainer().getId());
+    Assert.assertEquals(Resource.newInstance(2 * GB, 1),
+        uc.getContainer().getResource());
+
+    // Check that the container resources are increased in
+    // NM through NM heartbeat response
+    if (response.getContainersToUpdate().size() == 0) {
+      response = nm1.nodeHeartbeat(true);
+    }
+    Assert.assertEquals(1, response.getContainersToUpdate().size());
+    Assert.assertEquals(Resource.newInstance(2 * GB, 1),
+        response.getContainersToUpdate().get(0).getResource());
+
+    //Decrease resources
+    allocateResponse = am1.sendContainerUpdateRequest(Arrays.asList(
+        UpdateContainerRequest.newInstance(2, container.getId(),
+            ContainerUpdateType.DECREASE_RESOURCE,
+            Resources.createResource(1 * GB, 1), null)));
+    Assert.assertEquals(1, allocateResponse.getUpdatedContainers().size());
+
+    // Check that the container resources are decreased
+    // in NM through NM heartbeat response
+    response = nm1.nodeHeartbeat(true);
+    Assert.assertEquals(1, response.getContainersToUpdate().size());
+    Assert.assertEquals(Resource.newInstance(1 * GB, 1),
+        response.getContainersToUpdate().get(0).getResource());
+
+    nm1.nodeHeartbeat(true);
+    // DEMOTE the container
+    allocateResponse = am1.sendContainerUpdateRequest(Arrays.asList(
+        UpdateContainerRequest.newInstance(3, container.getId(),
+            ContainerUpdateType.DEMOTE_EXECUTION_TYPE, null,
+            ExecutionType.OPPORTUNISTIC)));
+
+    response = nm1.nodeHeartbeat(Arrays.asList(ContainerStatus
+        .newInstance(container.getId(), ExecutionType.GUARANTEED,
+            ContainerState.RUNNING, "", 0)), true);
+    Thread.sleep(200);
+    if (allocateResponse.getUpdatedContainers().size() == 0) {
+      // Get the update response on next allocate
+      allocateResponse = am1.allocate(new ArrayList<>(), new ArrayList<>());
+    }
+    // Check the update response from YARNRM
+    Assert.assertEquals(1, allocateResponse.getUpdatedContainers().size());
+    uc = allocateResponse.getUpdatedContainers().get(0);
+    Assert.assertEquals(ExecutionType.OPPORTUNISTIC,
+        uc.getContainer().getExecutionType());
+    // Check that the container is updated in NM through NM heartbeat response
+    if (response.getContainersToUpdate().size() == 0) {
+      response = nm1.nodeHeartbeat(true);
+    }
+    Assert.assertEquals(1, response.getContainersToUpdate().size());
+    Assert.assertEquals(ExecutionType.OPPORTUNISTIC,
+        response.getContainersToUpdate().get(0).getExecutionType());
+  }
+
   private void verifyMetrics(QueueMetrics metrics, long availableMB,
       int availableVirtualCores, long allocatedMB,
       int allocatedVirtualCores, int allocatedContainers) {

+ 2 - 5
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerResizing.java

@@ -51,8 +51,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerStat
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler
-    .SchedulerApplicationAttempt;
+
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica
@@ -60,11 +59,9 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSet;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SimplePlacementSet;
 import org.apache.hadoop.yarn.util.resource.Resources;
 import org.junit.Assert;
 import org.junit.Before;
-import org.junit.Ignore;
 import org.junit.Test;
 
 public class TestContainerResizing {
@@ -205,7 +202,7 @@ public class TestContainerResizing {
     RMNodeImpl rmNode =
         (RMNodeImpl) rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
     Collection<Container> decreasedContainers =
-        rmNode.getToBeDecreasedContainers();
+        rmNode.getToBeUpdatedContainers();
     boolean rmNodeReceivedDecreaseContainer = false;
     for (Container c : decreasedContainers) {
       if (c.getId().equals(containerId1)

+ 2 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestIncreaseAllocationExpirer.java

@@ -319,7 +319,7 @@ public class TestIncreaseAllocationExpirer {
     verifyAvailableResourceOfSchedulerNode(rm1, nm1.getNodeId(), 16 * GB);
     // Verify NM receives the decrease message (3G)
     List<Container> containersToDecrease =
-        nm1.nodeHeartbeat(true).getContainersToDecrease();
+        nm1.nodeHeartbeat(true).getContainersToUpdate();
     Assert.assertEquals(1, containersToDecrease.size());
     Assert.assertEquals(
         3 * GB, containersToDecrease.get(0).getResource().getMemorySize());
@@ -435,7 +435,7 @@ public class TestIncreaseAllocationExpirer {
             .getAllocatedResource().getMemorySize());
     // Verify NM receives 2 decrease message
     List<Container> containersToDecrease =
-        nm1.nodeHeartbeat(true).getContainersToDecrease();
+        nm1.nodeHeartbeat(true).getContainersToUpdate();
     Assert.assertEquals(2, containersToDecrease.size());
     // Sort the list to make sure containerId3 is the first
     Collections.sort(containersToDecrease);