浏览代码

YARN-1509. Make AMRMClient support send increase container request and get increased/decreased containers. (Meng Ding via wangda)

(cherry picked from commit 7ff280fca9af45b98cee2336e78803da46b0f8a5)
Wangda Tan 9 年之前
父节点
当前提交
875aec3177

+ 3 - 0
hadoop-yarn-project/CHANGES.txt

@@ -192,6 +192,9 @@ Release 2.8.0 - UNRELEASED
     YARN-1510. Make NMClient support change container resources. 
     (Meng Ding via wangda)
 
+    YARN-1509. Make AMRMClient support send increase container request and 
+    get increased/decreased containers. (Meng Ding via wangda)
+
   IMPROVEMENTS
 
     YARN-644. Basic null check is not performed on passed in arguments before

+ 6 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java

@@ -556,7 +556,8 @@ public class ApplicationMaster {
     appSubmitterUgi.addCredentials(credentials);
 
 
-    AMRMClientAsync.CallbackHandler allocListener = new RMCallbackHandler();
+    AMRMClientAsync.AbstractCallbackHandler allocListener =
+        new RMCallbackHandler();
     amRMClient = AMRMClientAsync.createAMRMClientAsync(1000, allocListener);
     amRMClient.init(conf);
     amRMClient.start();
@@ -731,7 +732,7 @@ public class ApplicationMaster {
   }
 
   @VisibleForTesting
-  class RMCallbackHandler implements AMRMClientAsync.CallbackHandler {
+  class RMCallbackHandler extends AMRMClientAsync.AbstractCallbackHandler {
     @SuppressWarnings("unchecked")
     @Override
     public void onContainersCompleted(List<ContainerStatus> completedContainers) {
@@ -834,6 +835,9 @@ public class ApplicationMaster {
       }
     }
 
+    @Override
+    public void onContainersResourceChanged(List<Container> containers) {}
+
     @Override
     public void onShutdownRequest() {
       done = true;

+ 23 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java

@@ -31,6 +31,8 @@ import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.service.AbstractService;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+
+import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
 import org.apache.hadoop.yarn.api.records.Priority;
@@ -285,7 +287,7 @@ public abstract class AMRMClient<T extends AMRMClient.ContainerRequest> extends
    * @param req Resource request
    */
   public abstract void addContainerRequest(T req);
-  
+
   /**
    * Remove previous container request. The previous container request may have 
    * already been sent to the ResourceManager. So even after the remove request 
@@ -294,7 +296,26 @@ public abstract class AMRMClient<T extends AMRMClient.ContainerRequest> extends
    * @param req Resource request
    */
   public abstract void removeContainerRequest(T req);
-  
+
+  /**
+   * Request container resource change before calling <code>allocate</code>.
+   * Any previous pending resource change request of the same container will be
+   * removed.
+   *
+   * Application that calls this method is expected to maintain the
+   * <code>Container</code>s that are returned from previous successful
+   * allocations or resource changes. By passing in the existing container and a
+   * target resource capability to this method, the application requests the
+   * ResourceManager to change the existing resource allocation to the target
+   * resource allocation.
+   *
+   * @param container The container returned from the last successful resource
+   *                  allocation or resource change
+   * @param capability  The target resource capability of the container
+   */
+  public abstract void requestContainerResourceChange(
+      Container container, Resource capability);
+
   /**
    * Release containers assigned by the Resource Manager. If the app cannot use
    * the container or wants to give up the container then it can release them.

+ 156 - 16
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java

@@ -56,11 +56,17 @@ import com.google.common.annotations.VisibleForTesting;
  * It should be used by implementing a CallbackHandler:
  * <pre>
  * {@code
- * class MyCallbackHandler implements AMRMClientAsync.CallbackHandler {
+ * class MyCallbackHandler extends AMRMClientAsync.AbstractCallbackHandler {
  *   public void onContainersAllocated(List<Container> containers) {
  *     [run tasks on the containers]
  *   }
- *   
+ *
+ *   public void onContainersResourceChanged(List<Container> containers) {
+ *     [determine if resource allocation of containers have been increased in
+ *      the ResourceManager, and if so, inform the NodeManagers to increase the
+ *      resource monitor/enforcement on the containers]
+ *   }
+ *
  *   public void onContainersCompleted(List<ContainerStatus> statuses) {
  *     [update progress, check whether app is done]
  *   }
@@ -100,23 +106,80 @@ extends AbstractService {
   protected final CallbackHandler handler;
   protected final AtomicInteger heartbeatIntervalMs = new AtomicInteger();
 
+  /**
+   * <p>Create a new instance of AMRMClientAsync.</p>
+   *
+   * @param intervalMs heartbeat interval in milliseconds between AM and RM
+   * @param callbackHandler callback handler that processes responses from
+   *                        the <code>ResourceManager</code>
+   */
+  public static <T extends ContainerRequest> AMRMClientAsync<T>
+      createAMRMClientAsync(
+      int intervalMs, AbstractCallbackHandler callbackHandler) {
+    return new AMRMClientAsyncImpl<T>(intervalMs, callbackHandler);
+  }
+
+  /**
+   * <p>Create a new instance of AMRMClientAsync.</p>
+   *
+   * @param client the AMRMClient instance
+   * @param intervalMs heartbeat interval in milliseconds between AM and RM
+   * @param callbackHandler callback handler that processes responses from
+   *                        the <code>ResourceManager</code>
+   */
+  public static <T extends ContainerRequest> AMRMClientAsync<T>
+      createAMRMClientAsync(
+      AMRMClient<T> client, int intervalMs,
+      AbstractCallbackHandler callbackHandler) {
+    return new AMRMClientAsyncImpl<T>(client, intervalMs, callbackHandler);
+  }
+
+  protected AMRMClientAsync(
+      int intervalMs, AbstractCallbackHandler callbackHandler) {
+    this(new AMRMClientImpl<T>(), intervalMs, callbackHandler);
+  }
+
+  @Private
+  @VisibleForTesting
+  protected AMRMClientAsync(AMRMClient<T> client, int intervalMs,
+      AbstractCallbackHandler callbackHandler) {
+    super(AMRMClientAsync.class.getName());
+    this.client = client;
+    this.heartbeatIntervalMs.set(intervalMs);
+    this.handler = callbackHandler;
+  }
+
+  /**
+   *
+   * @deprecated Use {@link #createAMRMClientAsync(int,
+   *             AMRMClientAsync.AbstractCallbackHandler)} instead.
+   */
+  @Deprecated
   public static <T extends ContainerRequest> AMRMClientAsync<T>
       createAMRMClientAsync(int intervalMs, CallbackHandler callbackHandler) {
     return new AMRMClientAsyncImpl<T>(intervalMs, callbackHandler);
   }
-  
+
+  /**
+   *
+   * @deprecated Use {@link #createAMRMClientAsync(AMRMClient,
+   *             int, AMRMClientAsync.AbstractCallbackHandler)} instead.
+   */
+  @Deprecated
   public static <T extends ContainerRequest> AMRMClientAsync<T>
       createAMRMClientAsync(AMRMClient<T> client, int intervalMs,
           CallbackHandler callbackHandler) {
     return new AMRMClientAsyncImpl<T>(client, intervalMs, callbackHandler);
   }
-  
+
+  @Deprecated
   protected AMRMClientAsync(int intervalMs, CallbackHandler callbackHandler) {
     this(new AMRMClientImpl<T>(), intervalMs, callbackHandler);
   }
   
   @Private
   @VisibleForTesting
+  @Deprecated
   protected AMRMClientAsync(AMRMClient<T> client, int intervalMs,
       CallbackHandler callbackHandler) {
     super(AMRMClientAsync.class.getName());
@@ -171,6 +234,25 @@ extends AbstractService {
    */
   public abstract void removeContainerRequest(T req);
 
+  /**
+   * Request container resource change before calling <code>allocate</code>.
+   * Any previous pending resource change request of the same container will be
+   * removed.
+   *
+   * Application that calls this method is expected to maintain the
+   * <code>Container</code>s that are returned from previous successful
+   * allocations or resource changes. By passing in the existing container and a
+   * target resource capability to this method, the application requests the
+   * ResourceManager to change the existing resource allocation to the target
+   * resource allocation.
+   *
+   * @param container The container returned from the last successful resource
+   *                  allocation or resource change
+   * @param capability  The target resource capability of the container
+   */
+  public abstract void requestContainerResourceChange(
+      Container container, Resource capability);
+
   /**
    * Release containers assigned by the Resource Manager. If the app cannot use
    * the container or wants to give up the container then it can release them.
@@ -264,37 +346,95 @@ extends AbstractService {
     } while (true);
   }
 
+  /**
+   * <p>
+   * The callback abstract class. The callback functions need to be implemented
+   * by {@link AMRMClientAsync} users. The APIs are called when responses from
+   * the <code>ResourceManager</code> are available.
+   * </p>
+   */
+  public abstract static class AbstractCallbackHandler
+      implements CallbackHandler {
+
+    /**
+     * Called when the ResourceManager responds to a heartbeat with completed
+     * containers. If the response contains both completed containers and
+     * allocated containers, this will be called before containersAllocated.
+     */
+    public abstract void onContainersCompleted(List<ContainerStatus> statuses);
+
+    /**
+     * Called when the ResourceManager responds to a heartbeat with allocated
+     * containers. If the response containers both completed containers and
+     * allocated containers, this will be called after containersCompleted.
+     */
+    public abstract void onContainersAllocated(List<Container> containers);
+
+    /**
+     * Called when the ResourceManager responds to a heartbeat with containers
+     * whose resource allocation has been changed.
+     */
+    public abstract void onContainersResourceChanged(
+        List<Container> containers);
+
+    /**
+     * Called when the ResourceManager wants the ApplicationMaster to shutdown
+     * for being out of sync etc. The ApplicationMaster should not unregister
+     * with the RM unless the ApplicationMaster wants to be the last attempt.
+     */
+    public abstract void onShutdownRequest();
+
+    /**
+     * Called when nodes tracked by the ResourceManager have changed in health,
+     * availability etc.
+     */
+    public abstract void onNodesUpdated(List<NodeReport> updatedNodes);
+
+    public abstract float getProgress();
+
+    /**
+     * Called when error comes from RM communications as well as from errors in
+     * the callback itself from the app. Calling
+     * stop() is the recommended action.
+     */
+    public abstract void onError(Throwable e);
+  }
+
+  /**
+   * @deprecated Use {@link AMRMClientAsync.AbstractCallbackHandler} instead.
+   */
+  @Deprecated
   public interface CallbackHandler {
-    
+
     /**
      * Called when the ResourceManager responds to a heartbeat with completed
      * containers. If the response contains both completed containers and
      * allocated containers, this will be called before containersAllocated.
      */
-    public void onContainersCompleted(List<ContainerStatus> statuses);
-    
+    void onContainersCompleted(List<ContainerStatus> statuses);
+
     /**
      * Called when the ResourceManager responds to a heartbeat with allocated
      * containers. If the response containers both completed containers and
      * allocated containers, this will be called after containersCompleted.
      */
-    public void onContainersAllocated(List<Container> containers);
-    
+    void onContainersAllocated(List<Container> containers);
+
     /**
      * Called when the ResourceManager wants the ApplicationMaster to shutdown
      * for being out of sync etc. The ApplicationMaster should not unregister
      * with the RM unless the ApplicationMaster wants to be the last attempt.
      */
-    public void onShutdownRequest();
-    
+    void onShutdownRequest();
+
     /**
      * Called when nodes tracked by the ResourceManager have changed in health,
      * availability etc.
      */
-    public void onNodesUpdated(List<NodeReport> updatedNodes);
-    
-    public float getProgress();
-    
+    void onNodesUpdated(List<NodeReport> updatedNodes);
+
+    float getProgress();
+
     /**
      * Called when error comes from RM communications as well as from errors in
      * the callback itself from the app. Calling
@@ -302,6 +442,6 @@ extends AbstractService {
      *
      * @param e
      */
-    public void onError(Throwable e);
+    void onError(Throwable e);
   }
 }

+ 50 - 3
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java

@@ -19,6 +19,7 @@
 package org.apache.hadoop.yarn.client.api.async.impl;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 import java.util.concurrent.BlockingQueue;
@@ -66,13 +67,41 @@ extends AMRMClientAsync<T> {
   private volatile float progress;
   
   private volatile Throwable savedException;
-  
+
+  /**
+   *
+   * @param intervalMs heartbeat interval in milliseconds between AM and RM
+   * @param callbackHandler callback handler that processes responses from
+   *                        the <code>ResourceManager</code>
+   */
+  public AMRMClientAsyncImpl(
+      int intervalMs, AbstractCallbackHandler callbackHandler) {
+    this(new AMRMClientImpl<T>(), intervalMs, callbackHandler);
+  }
+
+  public AMRMClientAsyncImpl(AMRMClient<T> client, int intervalMs,
+      AbstractCallbackHandler callbackHandler) {
+    super(client, intervalMs, callbackHandler);
+    heartbeatThread = new HeartbeatThread();
+    handlerThread = new CallbackHandlerThread();
+    responseQueue = new LinkedBlockingQueue<>();
+    keepRunning = true;
+    savedException = null;
+  }
+
+  /**
+   *
+   * @deprecated Use {@link #AMRMClientAsyncImpl(int,
+   *             AMRMClientAsync.AbstractCallbackHandler)} instead.
+   */
+  @Deprecated
   public AMRMClientAsyncImpl(int intervalMs, CallbackHandler callbackHandler) {
     this(new AMRMClientImpl<T>(), intervalMs, callbackHandler);
   }
-  
+
   @Private
   @VisibleForTesting
+  @Deprecated
   public AMRMClientAsyncImpl(AMRMClient<T> client, int intervalMs,
       CallbackHandler callbackHandler) {
     super(client, intervalMs, callbackHandler);
@@ -82,7 +111,7 @@ extends AMRMClientAsync<T> {
     keepRunning = true;
     savedException = null;
   }
-    
+
   @Override
   protected void serviceInit(Configuration conf) throws Exception {
     super.serviceInit(conf);
@@ -177,6 +206,12 @@ extends AMRMClientAsync<T> {
     client.removeContainerRequest(req);
   }
 
+  @Override
+  public void requestContainerResourceChange(
+      Container container, Resource capability) {
+    client.requestContainerResourceChange(container, capability);
+  }
+
   /**
    * Release containers assigned by the Resource Manager. If the app cannot use
    * the container or wants to give up the container then it can release them.
@@ -300,6 +335,18 @@ extends AMRMClientAsync<T> {
             handler.onContainersCompleted(completed);
           }
 
+          if (handler instanceof AMRMClientAsync.AbstractCallbackHandler) {
+            // RM side of the implementation guarantees that there are
+            // no duplications between increased and decreased containers
+            List<Container> changed = new ArrayList<>();
+            changed.addAll(response.getIncreasedContainers());
+            changed.addAll(response.getDecreasedContainers());
+            if (!changed.isEmpty()) {
+              ((AMRMClientAsync.AbstractCallbackHandler) handler)
+                  .onContainersResourceChanged(changed);
+            }
+          }
+
           List<Container> allocated = response.getAllocatedContainers();
           if (!allocated.isEmpty()) {
             handler.onContainersAllocated(allocated);

+ 135 - 14
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java

@@ -33,6 +33,7 @@ import java.util.Set;
 import java.util.SortedMap;
 import java.util.TreeMap;
 import java.util.TreeSet;
+import java.util.AbstractMap.SimpleEntry;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -49,7 +50,9 @@ import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest
 import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerResourceChangeRequest;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
 import org.apache.hadoop.yarn.api.records.NMToken;
@@ -72,6 +75,7 @@ import org.apache.hadoop.yarn.util.RackResolver;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
+import org.apache.hadoop.yarn.util.resource.Resources;
 
 @Private
 @Unstable
@@ -110,8 +114,8 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
       containerRequests = new LinkedHashSet<T>();
     }
   }
-  
-  
+
+
   /**
    * Class compares Resource by memory then cpu in reverse order
    */
@@ -144,10 +148,7 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
     int cpu0 = arg0.getVirtualCores();
     int cpu1 = arg1.getVirtualCores();
     
-    if(mem0 <= mem1 && cpu0 <= cpu1) { 
-      return true;
-    }
-    return false; 
+    return (mem0 <= mem1 && cpu0 <= cpu1);
   }
   
   //Key -> Priority
@@ -164,11 +165,22 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
   protected final Set<ResourceRequest> ask = new TreeSet<ResourceRequest>(
       new org.apache.hadoop.yarn.api.records.ResourceRequest.ResourceRequestComparator());
   protected final Set<ContainerId> release = new TreeSet<ContainerId>();
-  // pendingRelease holds history or release requests.request is removed only if
-  // RM sends completedContainer.
+  // pendingRelease holds history of release requests.
+  // request is removed only if RM sends completedContainer.
   // How it different from release? --> release is for per allocate() request.
   protected Set<ContainerId> pendingRelease = new TreeSet<ContainerId>();
-  
+  // change map holds container resource change requests between two allocate()
+  // calls, and are cleared after each successful allocate() call.
+  protected final Map<ContainerId, SimpleEntry<Container, Resource>> change =
+      new HashMap<>();
+  // pendingChange map holds history of container resource change requests in
+  // case AM needs to reregister with the ResourceManager.
+  // Change requests are removed from this map if RM confirms the change
+  // through allocate response, or if RM confirms that the container has been
+  // completed.
+  protected final Map<ContainerId, SimpleEntry<Container, Resource>>
+      pendingChange = new HashMap<>();
+
   public AMRMClientImpl() {
     super(AMRMClientImpl.class.getName());
   }
@@ -241,7 +253,8 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
     AllocateRequest allocateRequest = null;
     List<String> blacklistToAdd = new ArrayList<String>();
     List<String> blacklistToRemove = new ArrayList<String>();
-    
+    Map<ContainerId, SimpleEntry<Container, Resource>> oldChange =
+        new HashMap<>();
     try {
       synchronized (this) {
         askList = new ArrayList<ResourceRequest>(ask.size());
@@ -252,10 +265,30 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
               r.getResourceName(), r.getCapability(), r.getNumContainers(),
               r.getRelaxLocality(), r.getNodeLabelExpression()));
         }
+        List<ContainerResourceChangeRequest> increaseList = new ArrayList<>();
+        List<ContainerResourceChangeRequest> decreaseList = new ArrayList<>();
+        // Save the current change for recovery
+        oldChange.putAll(change);
+        for (Map.Entry<ContainerId, SimpleEntry<Container, Resource>> entry :
+            change.entrySet()) {
+          Container container = entry.getValue().getKey();
+          Resource original = container.getResource();
+          Resource target = entry.getValue().getValue();
+          if (Resources.fitsIn(target, original)) {
+            // This is a decrease request
+            decreaseList.add(ContainerResourceChangeRequest.newInstance(
+                container.getId(), target));
+          } else {
+            // This is an increase request
+            increaseList.add(ContainerResourceChangeRequest.newInstance(
+                container.getId(), target));
+          }
+        }
         releaseList = new ArrayList<ContainerId>(release);
         // optimistically clear this collection assuming no RPC failure
         ask.clear();
         release.clear();
+        change.clear();
 
         blacklistToAdd.addAll(blacklistAdditions);
         blacklistToRemove.addAll(blacklistRemovals);
@@ -266,8 +299,9 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
         
         allocateRequest =
             AllocateRequest.newInstance(lastResponseId, progressIndicator,
-              askList, releaseList, blacklistRequest);
-        // clear blacklistAdditions and blacklistRemovals before 
+                askList, releaseList, blacklistRequest,
+                    increaseList, decreaseList);
+        // clear blacklistAdditions and blacklistRemovals before
         // unsynchronized part
         blacklistAdditions.clear();
         blacklistRemovals.clear();
@@ -289,6 +323,7 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
               }
             }
           }
+          change.putAll(this.pendingChange);
         }
         // re register with RM
         registerApplicationMaster();
@@ -312,6 +347,23 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
           removePendingReleaseRequests(allocateResponse
               .getCompletedContainersStatuses());
         }
+        if (!pendingChange.isEmpty()) {
+          List<ContainerStatus> completed =
+              allocateResponse.getCompletedContainersStatuses();
+          List<Container> changed = new ArrayList<>();
+          changed.addAll(allocateResponse.getIncreasedContainers());
+          changed.addAll(allocateResponse.getDecreasedContainers());
+          // remove all pending change requests that belong to the completed
+          // containers
+          for (ContainerStatus status : completed) {
+            ContainerId containerId = status.getContainerId();
+            pendingChange.remove(containerId);
+          }
+          // remove all pending change requests that have been satisfied
+          if (!changed.isEmpty()) {
+            removePendingChangeRequests(changed);
+          }
+        }
       }
     } finally {
       // TODO how to differentiate remote yarn exception vs error in rpc
@@ -333,7 +385,22 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
               ask.add(oldAsk);
             }
           }
-          
+          // change requests could have been added during the allocate call.
+          // Those are the newest requests which take precedence
+          // over requests cached in the oldChange map.
+          //
+          // Only insert entries from the cached oldChange map
+          // that do not exist in the current change map:
+          for (Map.Entry<ContainerId, SimpleEntry<Container, Resource>> entry :
+              oldChange.entrySet()) {
+            ContainerId oldContainerId = entry.getKey();
+            Container oldContainer = entry.getValue().getKey();
+            Resource oldResource = entry.getValue().getValue();
+            if (change.get(oldContainerId) == null) {
+              change.put(
+                  oldContainerId, new SimpleEntry<>(oldContainer, oldResource));
+            }
+          }
           blacklistAdditions.addAll(blacklistToAdd);
           blacklistRemovals.addAll(blacklistToRemove);
         }
@@ -349,6 +416,24 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
     }
   }
 
+  protected void removePendingChangeRequests(
+      List<Container> changedContainers) {
+    for (Container changedContainer : changedContainers) {
+      ContainerId containerId = changedContainer.getId();
+      if (pendingChange.get(containerId) == null) {
+        continue;
+      }
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("RM has confirmed changed resource allocation for "
+            + "container " + containerId + ". Current resource allocation:"
+            + changedContainer.getResource()
+            + ". Remove pending change request:"
+            + pendingChange.get(containerId).getValue());
+      }
+      pendingChange.remove(containerId);
+    }
+  }
+
   @Private
   @VisibleForTesting
   protected void populateNMTokens(List<NMToken> nmTokens) {
@@ -479,12 +564,32 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
         req.getCapability(), req);
   }
 
+  @Override
+  public synchronized void requestContainerResourceChange(
+      Container container, Resource capability) {
+    validateContainerResourceChangeRequest(
+        container.getId(), container.getResource(), capability);
+    if (change.get(container.getId()) == null) {
+      change.put(container.getId(),
+          new SimpleEntry<>(container, capability));
+    } else {
+      change.get(container.getId()).setValue(capability);
+    }
+    if (pendingChange.get(container.getId()) == null) {
+      pendingChange.put(container.getId(),
+          new SimpleEntry<>(container, capability));
+    } else {
+      pendingChange.get(container.getId()).setValue(capability);
+    }
+  }
+
   @Override
   public synchronized void releaseAssignedContainer(ContainerId containerId) {
     Preconditions.checkArgument(containerId != null,
         "ContainerId can not be null.");
     pendingRelease.add(containerId);
     release.add(containerId);
+    pendingChange.remove(containerId);
   }
   
   @Override
@@ -618,7 +723,23 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
           "Cannot specify node label with rack and node");
     }
   }
-  
+
+  private void validateContainerResourceChangeRequest(
+      ContainerId containerId, Resource original, Resource target) {
+    Preconditions.checkArgument(containerId != null,
+        "ContainerId cannot be null");
+    Preconditions.checkArgument(original != null,
+        "Original resource capability cannot be null");
+    Preconditions.checkArgument(!Resources.equals(Resources.none(), original)
+            && Resources.fitsIn(Resources.none(), original),
+        "Original resource capability must be greater than 0");
+    Preconditions.checkArgument(target != null,
+        "Target resource capability cannot be null");
+    Preconditions.checkArgument(!Resources.equals(Resources.none(), target)
+            && Resources.fitsIn(Resources.none(), target),
+        "Target resource capability must be greater than 0");
+  }
+
   private void addResourceRequestToAsk(ResourceRequest remoteRequest) {
     // This code looks weird but is needed because of the following scenario.
     // A ResourceRequest is removed from the remoteRequestTable. A 0 container 

+ 64 - 10
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestAMRMClientAsync.java

@@ -74,12 +74,15 @@ public class TestAMRMClientAsync {
     List<ContainerStatus> completed1 = Arrays.asList(
         ContainerStatus.newInstance(newContainerId(0, 0, 0, 0),
             ContainerState.COMPLETE, "", 0));
-    List<Container> allocated1 = Arrays.asList(
+    List<Container> containers = Arrays.asList(
         Container.newInstance(null, null, null, null, null, null));
     final AllocateResponse response1 = createAllocateResponse(
-        new ArrayList<ContainerStatus>(), allocated1, null);
+        new ArrayList<ContainerStatus>(), containers, null);
     final AllocateResponse response2 = createAllocateResponse(completed1,
         new ArrayList<Container>(), null);
+    final AllocateResponse response3 = createAllocateResponse(
+        new ArrayList<ContainerStatus>(), new ArrayList<Container>(),
+        containers, containers, null);
     final AllocateResponse emptyResponse = createAllocateResponse(
         new ArrayList<ContainerStatus>(), new ArrayList<Container>(), null);
 
@@ -91,15 +94,15 @@ public class TestAMRMClientAsync {
       public AllocateResponse answer(InvocationOnMock invocation)
           throws Throwable {
         secondHeartbeatSync.incrementAndGet();
-        while(heartbeatBlock.get()) {
-          synchronized(heartbeatBlock) {
+        while (heartbeatBlock.get()) {
+          synchronized (heartbeatBlock) {
             heartbeatBlock.wait();
           }
         }
         secondHeartbeatSync.incrementAndGet();
         return response2;
       }
-    }).thenReturn(emptyResponse);
+    }).thenReturn(response3).thenReturn(emptyResponse);
     when(client.registerApplicationMaster(anyString(), anyInt(), anyString()))
       .thenReturn(null);
     when(client.getAvailableResources()).thenAnswer(new Answer<Resource>() {
@@ -146,16 +149,22 @@ public class TestAMRMClientAsync {
       Assert.assertEquals(null, callbackHandler.takeCompletedContainers());
       Thread.sleep(10);
     }
-    
+
     // wait for the completed containers from the second heartbeat's response
     while (callbackHandler.takeCompletedContainers() == null) {
       Thread.sleep(10);
     }
-    
+
+    // wait for the changed containers from the thrid heartbeat's response
+    while (callbackHandler.takeChangedContainers() == null) {
+      Thread.sleep(10);
+    }
+
     asyncClient.stop();
     
     Assert.assertEquals(null, callbackHandler.takeAllocatedContainers());
     Assert.assertEquals(null, callbackHandler.takeCompletedContainers());
+    Assert.assertEquals(null, callbackHandler.takeChangedContainers());
   }
 
   @Test(timeout=10000)
@@ -397,6 +406,17 @@ public class TestAMRMClientAsync {
     return response;
   }
 
+  private AllocateResponse createAllocateResponse(
+      List<ContainerStatus> completed, List<Container> allocated,
+      List<Container> increased, List<Container> decreased,
+      List<NMToken> nmTokens) {
+    AllocateResponse response =
+        AllocateResponse.newInstance(0, completed, allocated,
+            new ArrayList<NodeReport>(), null, null, 1, null, nmTokens,
+            increased, decreased);
+    return response;
+  }
+
   public static ContainerId newContainerId(int appId, int appAttemptId,
       long timestamp, int containerId) {
     ApplicationId applicationId = ApplicationId.newInstance(timestamp, appId);
@@ -405,9 +425,11 @@ public class TestAMRMClientAsync {
     return ContainerId.newContainerId(applicationAttemptId, containerId);
   }
 
-  private class TestCallbackHandler implements AMRMClientAsync.CallbackHandler {
+  private class TestCallbackHandler
+      extends AMRMClientAsync.AbstractCallbackHandler {
     private volatile List<ContainerStatus> completedContainers;
     private volatile List<Container> allocatedContainers;
+    private final List<Container> changedContainers = new ArrayList<>();
     Exception savedException = null;
     volatile boolean reboot = false;
     Object notifier = new Object();
@@ -425,7 +447,19 @@ public class TestAMRMClientAsync {
       }
       return ret;
     }
-    
+
+    public List<Container> takeChangedContainers() {
+      List<Container> ret = null;
+      synchronized (changedContainers) {
+        if (!changedContainers.isEmpty()) {
+          ret = new ArrayList<>(changedContainers);
+          changedContainers.clear();
+          changedContainers.notify();
+        }
+      }
+      return ret;
+    }
+
     public List<Container> takeAllocatedContainers() {
       List<Container> ret = allocatedContainers;
       if (ret == null) {
@@ -453,6 +487,22 @@ public class TestAMRMClientAsync {
       }
     }
 
+    @Override
+    public void onContainersResourceChanged(
+        List<Container> changed) {
+      synchronized (changedContainers) {
+        changedContainers.clear();
+        changedContainers.addAll(changed);
+        while (!changedContainers.isEmpty()) {
+          try {
+            changedContainers.wait();
+          } catch (InterruptedException ex) {
+            LOG.error("Interrupted during wait", ex);
+          }
+        }
+      }
+    }
+
     @Override
     public void onContainersAllocated(List<Container> containers) {
       allocatedContainers = containers;
@@ -494,7 +544,8 @@ public class TestAMRMClientAsync {
     }
   }
 
-  private class TestCallbackHandler2 implements AMRMClientAsync.CallbackHandler {
+  private class TestCallbackHandler2
+      extends AMRMClientAsync.AbstractCallbackHandler {
     Object notifier = new Object();
     @SuppressWarnings("rawtypes")
     AMRMClientAsync asynClient;
@@ -512,6 +563,9 @@ public class TestAMRMClientAsync {
     @Override
     public void onContainersAllocated(List<Container> containers) {}
 
+    @Override
+    public void onContainersResourceChanged(List<Container> containers) {}
+
     @Override
     public void onShutdownRequest() {}
 

+ 159 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java

@@ -35,10 +35,12 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import java.util.TreeSet;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.SecurityUtil;
@@ -73,6 +75,7 @@ import org.apache.hadoop.yarn.client.ClientRMProxy;
 import org.apache.hadoop.yarn.client.api.AMRMClient;
 import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
 import org.apache.hadoop.yarn.client.api.InvalidContainerRequestException;
+import org.apache.hadoop.yarn.client.api.NMClient;
 import org.apache.hadoop.yarn.client.api.NMTokenCache;
 import org.apache.hadoop.yarn.client.api.YarnClient;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
@@ -126,6 +129,8 @@ public class TestAMRMClient {
       rolling_interval_sec);
     conf.setLong(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, am_expire_ms);
     conf.setInt(YarnConfiguration.RM_NM_HEARTBEAT_INTERVAL_MS, 100);
+    // set the minimum allocation so that resource decrease can go under 1024
+    conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 512);
     conf.setLong(YarnConfiguration.NM_LOG_RETAIN_SECONDS, 1);
     yarnCluster = new MiniYARNCluster(TestAMRMClient.class.getName(), nodeCount, 1, 1);
     yarnCluster.init(conf);
@@ -730,8 +735,160 @@ public class TestAMRMClient {
         new ContainerRequest(Resource.newInstance(1024, 1), null, null,
             Priority.UNDEFINED, true, "x && y"));
   }
-    
-  private void testAllocation(final AMRMClientImpl<ContainerRequest> amClient)  
+
+  @Test(timeout=60000)
+  public void testAMRMClientWithContainerResourceChange()
+      throws YarnException, IOException {
+    AMRMClient<ContainerRequest> amClient = null;
+    try {
+      // start am rm client
+      amClient = AMRMClient.createAMRMClient();
+      Assert.assertNotNull(amClient);
+      // asserting we are using the singleton instance cache
+      Assert.assertSame(
+          NMTokenCache.getSingleton(), amClient.getNMTokenCache());
+      amClient.init(conf);
+      amClient.start();
+      assertEquals(STATE.STARTED, amClient.getServiceState());
+      // start am nm client
+      NMClientImpl nmClient = (NMClientImpl) NMClient.createNMClient();
+      Assert.assertNotNull(nmClient);
+      // asserting we are using the singleton instance cache
+      Assert.assertSame(
+          NMTokenCache.getSingleton(), nmClient.getNMTokenCache());
+      nmClient.init(conf);
+      nmClient.start();
+      assertEquals(STATE.STARTED, nmClient.getServiceState());
+      // am rm client register the application master with RM
+      amClient.registerApplicationMaster("Host", 10000, "");
+      // allocate three containers and make sure they are in RUNNING state
+      List<Container> containers =
+          allocateAndStartContainers(amClient, nmClient, 3);
+      // perform container resource increase and decrease tests
+      doContainerResourceChange(amClient, containers);
+      // unregister and finish up the test
+      amClient.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED,
+          null, null);
+    } finally {
+      if (amClient != null && amClient.getServiceState() == STATE.STARTED) {
+        amClient.stop();
+      }
+    }
+  }
+
+  private List<Container> allocateAndStartContainers(
+      final AMRMClient<ContainerRequest> amClient, final NMClient nmClient,
+      int num) throws YarnException, IOException {
+    // set up allocation requests
+    for (int i = 0; i < num; ++i) {
+      amClient.addContainerRequest(
+          new ContainerRequest(capability, nodes, racks, priority));
+    }
+    // send allocation requests
+    amClient.allocate(0.1f);
+    // sleep to let NM's heartbeat to RM and trigger allocations
+    sleep(150);
+    // get allocations
+    AllocateResponse allocResponse = amClient.allocate(0.1f);
+    List<Container> containers = allocResponse.getAllocatedContainers();
+    Assert.assertEquals(num, containers.size());
+    // build container launch context
+    Credentials ts = new Credentials();
+    DataOutputBuffer dob = new DataOutputBuffer();
+    ts.writeTokenStorageToStream(dob);
+    ByteBuffer securityTokens =
+        ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
+    // start a process long enough for increase/decrease action to take effect
+    ContainerLaunchContext clc = BuilderUtils.newContainerLaunchContext(
+        Collections.<String, LocalResource>emptyMap(),
+        new HashMap<String, String>(), Arrays.asList("sleep", "100"),
+        new HashMap<String, ByteBuffer>(), securityTokens,
+        new HashMap<ApplicationAccessType, String>());
+    // start the containers and make sure they are in RUNNING state
+    try {
+      for (int i = 0; i < num; i++) {
+        Container container = containers.get(i);
+        nmClient.startContainer(container, clc);
+        // NodeManager may still need some time to get the stable
+        // container status
+        while (true) {
+          ContainerStatus status = nmClient.getContainerStatus(
+              container.getId(), container.getNodeId());
+          if (status.getState() == ContainerState.RUNNING) {
+            break;
+          }
+          sleep(100);
+        }
+      }
+    } catch (YarnException e) {
+      throw new AssertionError("Exception is not expected: " + e);
+    }
+    // sleep to let NM's heartbeat to RM to confirm container launch
+    sleep(200);
+    return containers;
+  }
+
+
+  private void doContainerResourceChange(
+      final AMRMClient<ContainerRequest> amClient, List<Container> containers)
+      throws YarnException, IOException {
+    Assert.assertEquals(3, containers.size());
+    // remember the container IDs
+    Container container1 = containers.get(0);
+    Container container2 = containers.get(1);
+    Container container3 = containers.get(2);
+    AMRMClientImpl<ContainerRequest> amClientImpl =
+        (AMRMClientImpl<ContainerRequest>) amClient;
+    Assert.assertEquals(0, amClientImpl.change.size());
+    // verify newer request overwrites older request for the container1
+    amClientImpl.requestContainerResourceChange(
+        container1, Resource.newInstance(2048, 1));
+    amClientImpl.requestContainerResourceChange(
+        container1, Resource.newInstance(4096, 1));
+    Assert.assertEquals(Resource.newInstance(4096, 1),
+        amClientImpl.change.get(container1.getId()).getValue());
+    // verify new decrease request cancels old increase request for container1
+    amClientImpl.requestContainerResourceChange(
+        container1, Resource.newInstance(512, 1));
+    Assert.assertEquals(Resource.newInstance(512, 1),
+        amClientImpl.change.get(container1.getId()).getValue());
+    // request resource increase for container2
+    amClientImpl.requestContainerResourceChange(
+        container2, Resource.newInstance(2048, 1));
+    Assert.assertEquals(Resource.newInstance(2048, 1),
+        amClientImpl.change.get(container2.getId()).getValue());
+    // verify release request will cancel pending change requests for the same
+    // container
+    amClientImpl.requestContainerResourceChange(
+        container3, Resource.newInstance(2048, 1));
+    Assert.assertEquals(3, amClientImpl.pendingChange.size());
+    amClientImpl.releaseAssignedContainer(container3.getId());
+    Assert.assertEquals(2, amClientImpl.pendingChange.size());
+    // as of now: container1 asks to decrease to (512, 1)
+    //            container2 asks to increase to (2048, 1)
+    // send allocation requests
+    AllocateResponse allocResponse = amClient.allocate(0.1f);
+    Assert.assertEquals(0, amClientImpl.change.size());
+    // we should get decrease confirmation right away
+    List<Container> decreasedContainers =
+        allocResponse.getDecreasedContainers();
+    List<Container> increasedContainers =
+        allocResponse.getIncreasedContainers();
+    Assert.assertEquals(1, decreasedContainers.size());
+    Assert.assertEquals(0, increasedContainers.size());
+    // we should get increase allocation after the next NM's heartbeat to RM
+    sleep(150);
+    // get allocations
+    allocResponse = amClient.allocate(0.1f);
+    decreasedContainers =
+        allocResponse.getDecreasedContainers();
+    increasedContainers =
+        allocResponse.getIncreasedContainers();
+    Assert.assertEquals(1, increasedContainers.size());
+    Assert.assertEquals(0, decreasedContainers.size());
+  }
+
+  private void testAllocation(final AMRMClientImpl<ContainerRequest> amClient)
       throws YarnException, IOException {
     // setup container request
     

+ 61 - 19
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientOnRMRestart.java

@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.security.PrivilegedAction;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 
@@ -90,15 +91,17 @@ public class TestAMRMClientOnRMRestart {
   }
 
   // Test does major 6 steps verification.
-  // Step-1 : AMRMClient send allocate request for 2 container requests
-  // Step-2 : 2 containers are allocated by RM.
-  // Step-3 : AM Send 1 containerRequest(cRequest3) and 1 releaseRequests to
+  // Step-1 : AMRMClient send allocate request for 3 container requests
+  // Step-2 : 3 containers are allocated by RM.
+  // Step-3 : AM Send 1 containerRequest(cRequest4) and 1 releaseRequests to
   // RM
+  // Step-3.5 : AM Send 1 container resource increase request to RM
   // Step-4 : On RM restart, AM(does not know RM is restarted) sends additional
-  // containerRequest(cRequest4) and blacklisted nodes.
+  // containerRequest(cRequest5) and blacklisted nodes.
   // Intern RM send resync command
-  // Step-5 : Allocater after resync command & new containerRequest(cRequest5)
-  // Step-6 : RM allocates containers i.e cRequest3,cRequest4 and cRequest5
+  // Verify AM can recover increase request after resync
+  // Step-5 : Allocater after resync command & new containerRequest(cRequest6)
+  // Step-6 : RM allocates containers i.e cRequest4,cRequest5 and cRequest6
   @Test(timeout = 60000)
   public void testAMRMClientResendsRequestsOnRMRestart() throws Exception {
 
@@ -132,8 +135,8 @@ public class TestAMRMClientOnRMRestart {
     UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
     ugi.addTokenIdentifier(token.decodeIdentifier());
 
-    // Step-1 : AMRMClient send allocate request for 2 ContainerRequest
-    // cRequest1 = h1 and cRequest2 = h1,h2
+    // Step-1 : AMRMClient send allocate request for 3 ContainerRequest
+    // cRequest1 = h1, cRequest2 = h1,h2 and cRequest3 = h1
     // blacklisted nodes = h2
     AMRMClient<ContainerRequest> amClient = new MyAMRMClientImpl(rm1);
     amClient.init(conf);
@@ -148,6 +151,9 @@ public class TestAMRMClientOnRMRestart {
         createReq(1, 1024, new String[] { "h1", "h2" });
     amClient.addContainerRequest(cRequest2);
 
+    ContainerRequest cRequest3 = createReq(1, 1024, new String[] { "h1" });
+    amClient.addContainerRequest(cRequest3);
+
     List<String> blacklistAdditions = new ArrayList<String>();
     List<String> blacklistRemoval = new ArrayList<String>();
     blacklistAdditions.add("h2");
@@ -167,14 +173,14 @@ public class TestAMRMClientOnRMRestart {
     assertBlacklistAdditionsAndRemovals(1, 1, rm1);
 
     // Step-2 : NM heart beat is sent.
-    // On 2nd AM allocate request, RM allocates 2 containers to AM
+    // On 2nd AM allocate request, RM allocates 3 containers to AM
     nm1.nodeHeartbeat(true); // Node heartbeat
     dispatcher.await();
 
     allocateResponse = amClient.allocate(0.2f);
     dispatcher.await();
-    // 2 containers are allocated i.e for cRequest1 and cRequest2.
-    Assert.assertEquals("No of assignments must be 0", 2, allocateResponse
+    // 3 containers are allocated i.e for cRequest1, cRequest2 and cRequest3.
+    Assert.assertEquals("No of assignments must be 0", 3, allocateResponse
         .getAllocatedContainers().size());
     assertAsksAndReleases(0, 0, rm1);
     assertBlacklistAdditionsAndRemovals(0, 0, rm1);
@@ -184,6 +190,7 @@ public class TestAMRMClientOnRMRestart {
     // removed allocated container requests
     amClient.removeContainerRequest(cRequest1);
     amClient.removeContainerRequest(cRequest2);
+    amClient.removeContainerRequest(cRequest3);
 
     allocateResponse = amClient.allocate(0.2f);
     dispatcher.await();
@@ -193,8 +200,8 @@ public class TestAMRMClientOnRMRestart {
     assertBlacklistAdditionsAndRemovals(0, 0, rm1);
 
     // Step-3 : Send 1 containerRequest and 1 releaseRequests to RM
-    ContainerRequest cRequest3 = createReq(1, 1024, new String[] { "h1" });
-    amClient.addContainerRequest(cRequest3);
+    ContainerRequest cRequest4 = createReq(1, 1024, new String[] { "h1" });
+    amClient.addContainerRequest(cRequest4);
 
     int pendingRelease = 0;
     Iterator<Container> it = allocatedContainers.iterator();
@@ -205,11 +212,24 @@ public class TestAMRMClientOnRMRestart {
       break;// remove one container
     }
 
+    // Step-3.5 : Send 1 container resource increase request to RM
+    Container container = it.next();
+    ContainerId containerId = container.getId();
+    // Make sure that container is in RUNNING state before sending increase
+    // request
+    nm1.nodeHeartbeat(containerId.getApplicationAttemptId(),
+        containerId.getContainerId(), ContainerState.RUNNING);
+    amClient.requestContainerResourceChange(
+        container, Resource.newInstance(2048, 1));
+    it.remove();
+
     allocateResponse = amClient.allocate(0.3f);
     dispatcher.await();
     Assert.assertEquals("No of assignments must be 0", 0, allocateResponse
         .getAllocatedContainers().size());
     assertAsksAndReleases(3, pendingRelease, rm1);
+    // Verify there is one increase and zero decrease
+    assertChanges(1, 0, rm1);
     assertBlacklistAdditionsAndRemovals(0, 0, rm1);
     int completedContainer =
         allocateResponse.getCompletedContainersStatuses().size();
@@ -228,7 +248,13 @@ public class TestAMRMClientOnRMRestart {
 
     // new NM to represent NM re-register
     nm1 = new MockNM("h1:1234", 10240, rm2.getResourceTrackerService());
-    nm1.registerNode();
+    NMContainerStatus containerReport =
+        NMContainerStatus.newInstance(containerId, ContainerState.RUNNING,
+            Resource.newInstance(1024, 1), "recover container", 0,
+            Priority.newInstance(0), 0);
+    nm1.registerNode(Collections.singletonList(containerReport),
+        Collections.singletonList(
+            containerId.getApplicationAttemptId().getApplicationId()));
     nm1.nodeHeartbeat(true);
     dispatcher.await();
 
@@ -243,9 +269,9 @@ public class TestAMRMClientOnRMRestart {
       it.remove();
     }
 
-    ContainerRequest cRequest4 =
+    ContainerRequest cRequest5 =
         createReq(1, 1024, new String[] { "h1", "h2" });
-    amClient.addContainerRequest(cRequest4);
+    amClient.addContainerRequest(cRequest5);
 
     // Step-4 : On RM restart, AM(does not know RM is restarted) sends
     // additional
@@ -259,11 +285,13 @@ public class TestAMRMClientOnRMRestart {
     pendingRelease -= completedContainer;
 
     assertAsksAndReleases(4, pendingRelease, rm2);
+    // Verify there is one increase and zero decrease
+    assertChanges(1, 0, rm2);
     assertBlacklistAdditionsAndRemovals(2, 0, rm2);
 
-    ContainerRequest cRequest5 =
+    ContainerRequest cRequest6 =
         createReq(1, 1024, new String[] { "h1", "h2", "h3" });
-    amClient.addContainerRequest(cRequest5);
+    amClient.addContainerRequest(cRequest6);
 
     // Step-5 : Allocater after resync command
     allocateResponse = amClient.allocate(0.5f);
@@ -272,6 +300,8 @@ public class TestAMRMClientOnRMRestart {
         .getAllocatedContainers().size());
 
     assertAsksAndReleases(5, 0, rm2);
+    // Verify there is no increase or decrease requests any more
+    assertChanges(0, 0, rm2);
     assertBlacklistAdditionsAndRemovals(0, 0, rm2);
 
     int noAssignedContainer = 0;
@@ -289,7 +319,7 @@ public class TestAMRMClientOnRMRestart {
       Thread.sleep(1000);
     }
 
-    // Step-6 : RM allocates containers i.e cRequest3,cRequest4 and cRequest5
+    // Step-6 : RM allocates containers i.e cRequest4,cRequest5 and cRequest6
     Assert.assertEquals("Number of container should be 3", 3,
         noAssignedContainer);
 
@@ -519,6 +549,8 @@ public class TestAMRMClientOnRMRestart {
 
     List<ResourceRequest> lastAsk = null;
     List<ContainerId> lastRelease = null;
+    List<ContainerResourceChangeRequest> lastIncrease = null;
+    List<ContainerResourceChangeRequest> lastDecrease = null;
     List<String> lastBlacklistAdditions;
     List<String> lastBlacklistRemovals;
 
@@ -541,6 +573,8 @@ public class TestAMRMClientOnRMRestart {
       }
       lastAsk = ask;
       lastRelease = release;
+      lastIncrease = increaseRequests;
+      lastDecrease = decreaseRequests;
       lastBlacklistAdditions = blacklistAdditions;
       lastBlacklistRemovals = blacklistRemovals;
       return super.allocate(applicationAttemptId, askCopy, release,
@@ -647,6 +681,14 @@ public class TestAMRMClientOnRMRestart {
         rm.getMyFifoScheduler().lastRelease.size());
   }
 
+  private static void assertChanges(
+      int expectedIncrease, int expectedDecrease, MyResourceManager rm) {
+    Assert.assertEquals(
+        expectedIncrease, rm.getMyFifoScheduler().lastIncrease.size());
+    Assert.assertEquals(
+        expectedDecrease, rm.getMyFifoScheduler().lastDecrease.size());
+  }
+
   private ContainerRequest createReq(int priority, int memory, String[] hosts) {
     Resource capability = Resource.newInstance(memory, 1);
     Priority priorityOfContainer = Priority.newInstance(priority);