Bläddra i källkod

YARN-1510. Make NMClient support change container resources. (Meng Ding via wangda)

(cherry picked from commit c99925d6dd0235f0d27536f0bebd129e435688fb)
Wangda Tan 9 år sedan
förälder
incheckning
8a8ac50a5b

+ 3 - 0
hadoop-yarn-project/CHANGES.txt

@@ -189,6 +189,9 @@ Release 2.8.0 - UNRELEASED
     YARN-2729. Support script based NodeLabelsProvider Interface in Distributed Node Label
     Configuration Setup. (Naganarasimha G R via rohithsharmaks)
 
+    YARN-1510. Make NMClient support change container resources. 
+    (Meng Ding via wangda)
+
   IMPROVEMENTS
 
     YARN-644. Basic null check is not performed on passed in arguments before

+ 10 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java

@@ -858,8 +858,7 @@ public class ApplicationMaster {
   }
 
   @VisibleForTesting
-  static class NMCallbackHandler
-    implements NMClientAsync.CallbackHandler {
+  static class NMCallbackHandler extends NMClientAsync.AbstractCallbackHandler {
 
     private ConcurrentMap<ContainerId, Container> containers =
         new ConcurrentHashMap<ContainerId, Container>();
@@ -907,6 +906,10 @@ public class ApplicationMaster {
       }
     }
 
+    @Override
+    public void onContainerResourceIncreased(
+        ContainerId containerId, Resource resource) {}
+
     @Override
     public void onStartContainerError(ContainerId containerId, Throwable t) {
       LOG.error("Failed to start Container " + containerId);
@@ -926,6 +929,11 @@ public class ApplicationMaster {
       LOG.error("Failed to stop Container " + containerId);
       containers.remove(containerId);
     }
+
+    @Override
+    public void onIncreaseContainerResourceError(
+        ContainerId containerId, Throwable t) {}
+
   }
 
   /**

+ 16 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/NMClient.java

@@ -86,6 +86,22 @@ public abstract class NMClient extends AbstractService {
       ContainerLaunchContext containerLaunchContext)
           throws YarnException, IOException;
 
+  /**
+   * <p>Increase the resource of a container.</p>
+   *
+   * <p>The <code>ApplicationMaster</code> or other applications that use the
+   * client must provide the details of the container, including the Id and
+   * the target resource encapsulated in the updated container token via
+   * {@link Container}.
+   * </p>
+   *
+   * @param container the container with updated token
+   * @throws YarnException
+   * @throws IOException
+   */
+  public abstract void increaseContainerResource(Container container)
+      throws YarnException, IOException;
+
   /**
    * <p>Stop an started container.</p>
    *

+ 152 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/NMClientAsync.java

@@ -31,6 +31,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.Token;
 import org.apache.hadoop.yarn.client.api.NMClient;
 import org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl;
@@ -51,11 +52,16 @@ import com.google.common.annotations.VisibleForTesting;
  *
  * <pre>
  * {@code
- * class MyCallbackHandler implements NMClientAsync.CallbackHandler {
+ * class MyCallbackHandler extends NMClientAsync.AbstractCallbackHandler {
  *   public void onContainerStarted(ContainerId containerId,
  *       Map<String, ByteBuffer> allServiceResponse) {
  *     [post process after the container is started, process the response]
  *   }
+
+ *   public void onContainerResourceIncreased(ContainerId containerId,
+ *       Resource resource) {
+ *     [post process after the container resource is increased]
+ *   }
  *
  *   public void onContainerStatusReceived(ContainerId containerId,
  *       ContainerStatus containerStatus) {
@@ -111,21 +117,58 @@ public abstract class NMClientAsync extends AbstractService {
   protected NMClient client;
   protected CallbackHandler callbackHandler;
 
+  public static NMClientAsync createNMClientAsync(
+      AbstractCallbackHandler callbackHandler) {
+    return new NMClientAsyncImpl(callbackHandler);
+  }
+
+  protected NMClientAsync(AbstractCallbackHandler callbackHandler) {
+    this (NMClientAsync.class.getName(), callbackHandler);
+  }
+
+  protected NMClientAsync(
+      String name, AbstractCallbackHandler callbackHandler) {
+    this (name, new NMClientImpl(), callbackHandler);
+  }
+
+  protected NMClientAsync(String name, NMClient client,
+      AbstractCallbackHandler callbackHandler) {
+    super(name);
+    this.setClient(client);
+    this.setCallbackHandler(callbackHandler);
+  }
+
+  /**
+   * @deprecated Use {@link #createNMClientAsync(AbstractCallbackHandler)}
+   *             instead.
+   */
+  @Deprecated
   public static NMClientAsync createNMClientAsync(
       CallbackHandler callbackHandler) {
     return new NMClientAsyncImpl(callbackHandler);
   }
-  
+
+  /**
+   * @deprecated Use {@link #NMClientAsync(AbstractCallbackHandler)}
+   *             instead.
+   */
+  @Deprecated
   protected NMClientAsync(CallbackHandler callbackHandler) {
     this (NMClientAsync.class.getName(), callbackHandler);
   }
 
+  /**
+   * @deprecated Use {@link #NMClientAsync(String, AbstractCallbackHandler)}
+   *             instead.
+   */
+  @Deprecated
   protected NMClientAsync(String name, CallbackHandler callbackHandler) {
     this (name, new NMClientImpl(), callbackHandler);
   }
 
   @Private
   @VisibleForTesting
+  @Deprecated
   protected NMClientAsync(String name, NMClient client,
       CallbackHandler callbackHandler) {
     super(name);
@@ -136,6 +179,8 @@ public abstract class NMClientAsync extends AbstractService {
   public abstract void startContainerAsync(
       Container container, ContainerLaunchContext containerLaunchContext);
 
+  public abstract void increaseContainerResourceAsync(Container container);
+
   public abstract void stopContainerAsync(
       ContainerId containerId, NodeId nodeId);
 
@@ -159,6 +204,110 @@ public abstract class NMClientAsync extends AbstractService {
   }
 
   /**
+   * <p>
+   * The callback abstract class. The callback functions need to be implemented
+   * by {@link NMClientAsync} users. The APIs are called when responses from
+   * <code>NodeManager</code> are available.
+   * </p>
+   *
+   * <p>
+   * Once a callback happens, the users can chose to act on it in blocking or
+   * non-blocking manner. If the action on callback is done in a blocking
+   * manner, some of the threads performing requests on NodeManagers may get
+   * blocked depending on how many threads in the pool are busy.
+   * </p>
+   *
+   * <p>
+   * The implementation of the callback functions should not throw the
+   * unexpected exception. Otherwise, {@link NMClientAsync} will just
+   * catch, log and then ignore it.
+   * </p>
+   */
+  public abstract static class AbstractCallbackHandler
+      implements CallbackHandler {
+    /**
+     * The API is called when <code>NodeManager</code> responds to indicate its
+     * acceptance of the starting container request.
+     *
+     * @param containerId the Id of the container
+     * @param allServiceResponse a Map between the auxiliary service names and
+     *                           their outputs
+     */
+    public abstract void onContainerStarted(ContainerId containerId,
+        Map<String, ByteBuffer> allServiceResponse);
+
+    /**
+     * The API is called when <code>NodeManager</code> responds with the status
+     * of the container.
+     *
+     * @param containerId the Id of the container
+     * @param containerStatus the status of the container
+     */
+    public abstract void onContainerStatusReceived(ContainerId containerId,
+        ContainerStatus containerStatus);
+
+    /**
+     * The API is called when <code>NodeManager</code> responds to indicate the
+     * container is stopped.
+     *
+     * @param containerId the Id of the container
+     */
+    public abstract void onContainerStopped(ContainerId containerId);
+
+    /**
+     * The API is called when an exception is raised in the process of
+     * starting a container.
+     *
+     * @param containerId the Id of the container
+     * @param t the raised exception
+     */
+    public abstract void onStartContainerError(
+        ContainerId containerId, Throwable t);
+
+    /**
+     * The API is called when <code>NodeManager</code> responds to indicate
+     * the container resource has been successfully increased.
+     *
+     * @param containerId the Id of the container
+     * @param resource the target resource of the container
+     */
+    public abstract void onContainerResourceIncreased(
+        ContainerId containerId, Resource resource);
+
+    /**
+     * The API is called when an exception is raised in the process of
+     * querying the status of a container.
+     *
+     * @param containerId the Id of the container
+     * @param t the raised exception
+     */
+    public abstract void onGetContainerStatusError(
+        ContainerId containerId, Throwable t);
+
+    /**
+     * The API is called when an exception is raised in the process of
+     * increasing container resource.
+     *
+     * @param containerId the Id of the container
+     * @param t the raised exception
+     */
+    public abstract void onIncreaseContainerResourceError(
+        ContainerId containerId, Throwable t);
+
+    /**
+     * The API is called when an exception is raised in the process of
+     * stopping a container.
+     *
+     * @param containerId the Id of the container
+     * @param t the raised exception
+     */
+    public abstract void onStopContainerError(
+        ContainerId containerId, Throwable t);
+  }
+
+  /**
+   * @deprecated Use {@link NMClientAsync.AbstractCallbackHandler} instead.
+   *
    * <p>
    * The callback interface needs to be implemented by {@link NMClientAsync}
    * users. The APIs are called when responses from <code>NodeManager</code> are
@@ -178,6 +327,7 @@ public abstract class NMClientAsync extends AbstractService {
    * catch, log and then ignore it.
    * </p>
    */
+  @Deprecated
   public static interface CallbackHandler {
     /**
      * The API is called when <code>NodeManager</code> responds to indicate its

+ 123 - 4
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/NMClientAsyncImpl.java

@@ -82,16 +82,46 @@ public class NMClientAsyncImpl extends NMClientAsync {
   protected ConcurrentMap<ContainerId, StatefulContainer> containers =
       new ConcurrentHashMap<ContainerId, StatefulContainer>();
 
+  public NMClientAsyncImpl(AbstractCallbackHandler callbackHandler) {
+    this(NMClientAsync.class.getName(), callbackHandler);
+  }
+
+  public NMClientAsyncImpl(
+      String name, AbstractCallbackHandler callbackHandler) {
+    this(name, new NMClientImpl(), callbackHandler);
+  }
+
+  @Private
+  @VisibleForTesting
+  protected NMClientAsyncImpl(String name, NMClient client,
+      AbstractCallbackHandler callbackHandler) {
+    super(name, client, callbackHandler);
+    this.client = client;
+    this.callbackHandler = callbackHandler;
+  }
+
+  /**
+   * @deprecated Use {@link
+   *             #NMClientAsyncImpl(NMClientAsync.AbstractCallbackHandler)}
+   *             instead.
+   */
+  @Deprecated
   public NMClientAsyncImpl(CallbackHandler callbackHandler) {
     this(NMClientAsync.class.getName(), callbackHandler);
   }
 
+  /**
+   * @deprecated Use {@link #NMClientAsyncImpl(String,
+   *             NMClientAsync.AbstractCallbackHandler)} instead.
+   */
+  @Deprecated
   public NMClientAsyncImpl(String name, CallbackHandler callbackHandler) {
     this(name, new NMClientImpl(), callbackHandler);
   }
 
   @Private
   @VisibleForTesting
+  @Deprecated
   protected NMClientAsyncImpl(String name, NMClient client,
       CallbackHandler callbackHandler) {
     super(name, client, callbackHandler);
@@ -229,6 +259,29 @@ public class NMClientAsyncImpl extends NMClientAsync {
     }
   }
 
+  public void increaseContainerResourceAsync(Container container) {
+    if (!(callbackHandler instanceof AbstractCallbackHandler)) {
+      LOG.error("Callback handler does not implement container resource "
+              + "increase callback methods");
+      return;
+    }
+    AbstractCallbackHandler handler = (AbstractCallbackHandler) callbackHandler;
+    if (containers.get(container.getId()) == null) {
+      handler.onIncreaseContainerResourceError(
+          container.getId(),
+          RPCUtil.getRemoteException(
+              "Container " + container.getId() +
+                  " is neither started nor scheduled to start"));
+    }
+    try {
+      events.put(new IncreaseContainerResourceEvent(container));
+    } catch (InterruptedException e) {
+      LOG.warn("Exception when scheduling the event of increasing resource of "
+          + "Container " + container.getId());
+      handler.onIncreaseContainerResourceError(container.getId(), e);
+    }
+  }
+
   public void stopContainerAsync(ContainerId containerId, NodeId nodeId) {
     if (containers.get(containerId) == null) {
       callbackHandler.onStopContainerError(containerId,
@@ -276,7 +329,8 @@ public class NMClientAsyncImpl extends NMClientAsync {
   protected static enum ContainerEventType {
     START_CONTAINER,
     STOP_CONTAINER,
-    QUERY_CONTAINER
+    QUERY_CONTAINER,
+    INCREASE_CONTAINER_RESOURCE
   }
 
   protected static class ContainerEvent
@@ -327,6 +381,21 @@ public class NMClientAsyncImpl extends NMClientAsync {
     }
   }
 
+  protected static class IncreaseContainerResourceEvent extends ContainerEvent {
+    private Container container;
+
+    public IncreaseContainerResourceEvent(Container container) {
+      super(container.getId(), container.getNodeId(),
+          container.getContainerToken(),
+              ContainerEventType.INCREASE_CONTAINER_RESOURCE);
+      this.container = container;
+    }
+
+    public Container getContainer() {
+      return container;
+    }
+  }
+
   protected static class StatefulContainer implements
       EventHandler<ContainerEvent> {
 
@@ -344,7 +413,9 @@ public class NMClientAsyncImpl extends NMClientAsync {
                 ContainerEventType.STOP_CONTAINER, new OutOfOrderTransition())
 
             // Transitions from RUNNING state
-            // RUNNING -> RUNNING should be the invalid transition
+            .addTransition(ContainerState.RUNNING, ContainerState.RUNNING,
+                ContainerEventType.INCREASE_CONTAINER_RESOURCE,
+                new IncreaseContainerResourceTransition())
             .addTransition(ContainerState.RUNNING,
                 EnumSet.of(ContainerState.DONE, ContainerState.FAILED),
                 ContainerEventType.STOP_CONTAINER,
@@ -353,12 +424,14 @@ public class NMClientAsyncImpl extends NMClientAsync {
             // Transition from DONE state
             .addTransition(ContainerState.DONE, ContainerState.DONE,
                 EnumSet.of(ContainerEventType.START_CONTAINER,
-                    ContainerEventType.STOP_CONTAINER))
+                    ContainerEventType.STOP_CONTAINER,
+                    ContainerEventType.INCREASE_CONTAINER_RESOURCE))
 
             // Transition from FAILED state
             .addTransition(ContainerState.FAILED, ContainerState.FAILED,
                 EnumSet.of(ContainerEventType.START_CONTAINER,
-                    ContainerEventType.STOP_CONTAINER));
+                    ContainerEventType.STOP_CONTAINER,
+                    ContainerEventType.INCREASE_CONTAINER_RESOURCE));
 
     protected static class StartContainerTransition implements
         MultipleArcTransition<StatefulContainer, ContainerEvent,
@@ -410,6 +483,52 @@ public class NMClientAsyncImpl extends NMClientAsync {
       }
     }
 
+    protected static class IncreaseContainerResourceTransition implements
+        SingleArcTransition<StatefulContainer, ContainerEvent> {
+      @Override
+      public void transition(
+          StatefulContainer container, ContainerEvent event) {
+        if (!(container.nmClientAsync.getCallbackHandler()
+            instanceof AbstractCallbackHandler)) {
+          LOG.error("Callback handler does not implement container resource "
+              + "increase callback methods");
+          return;
+        }
+        AbstractCallbackHandler handler =
+            (AbstractCallbackHandler) container.nmClientAsync
+                .getCallbackHandler();
+        try {
+          if (!(event instanceof IncreaseContainerResourceEvent)) {
+            throw new AssertionError("Unexpected event type. Expecting:"
+                + "IncreaseContainerResourceEvent. Got:" + event);
+          }
+          IncreaseContainerResourceEvent increaseEvent =
+              (IncreaseContainerResourceEvent) event;
+          container.nmClientAsync.getClient().increaseContainerResource(
+              increaseEvent.getContainer());
+          try {
+            handler.onContainerResourceIncreased(
+                increaseEvent.getContainerId(), increaseEvent.getContainer()
+                    .getResource());
+          } catch (Throwable thr) {
+            // Don't process user created unchecked exception
+            LOG.info("Unchecked exception is thrown from "
+                + "onContainerResourceIncreased for Container "
+                + event.getContainerId(), thr);
+          }
+        } catch (Exception e) {
+          try {
+            handler.onIncreaseContainerResourceError(event.getContainerId(), e);
+          } catch (Throwable thr) {
+            // Don't process user created unchecked exception
+            LOG.info("Unchecked exception is thrown from "
+                + "onIncreaseContainerResourceError for Container "
+                + event.getContainerId(), thr);
+          }
+        }
+      }
+    }
+
     protected static class StopContainerTransition implements
         MultipleArcTransition<StatefulContainer, ContainerEvent,
         ContainerState> {

+ 32 - 3
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/NMClientImpl.java

@@ -35,6 +35,8 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.token.SecretManager.InvalidToken;
 import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.StartContainersResponse;
@@ -147,8 +149,7 @@ public class NMClientImpl extends NMClient {
     private ContainerState state;
     
     
-    public StartedContainer(ContainerId containerId, NodeId nodeId,
-        Token containerToken) {
+    public StartedContainer(ContainerId containerId, NodeId nodeId) {
       this.containerId = containerId;
       this.nodeId = nodeId;
       state = ContainerState.NEW;
@@ -231,6 +232,34 @@ public class NMClientImpl extends NMClient {
     }
   }
 
+  @Override
+  public void increaseContainerResource(Container container)
+      throws YarnException, IOException {
+    ContainerManagementProtocolProxyData proxy = null;
+    try {
+      proxy = cmProxy.getProxy(
+          container.getNodeId().toString(), container.getId());
+      List<Token> increaseTokens = new ArrayList<>();
+      increaseTokens.add(container.getContainerToken());
+      IncreaseContainersResourceRequest increaseRequest =
+          IncreaseContainersResourceRequest
+              .newInstance(increaseTokens);
+      IncreaseContainersResourceResponse response =
+          proxy.getContainerManagementProtocol()
+              .increaseContainersResource(increaseRequest);
+      if (response.getFailedRequests() != null
+          && response.getFailedRequests().containsKey(container.getId())) {
+        Throwable t = response.getFailedRequests().get(container.getId())
+            .deSerialize();
+        parseAndThrowException(t);
+      }
+    } finally {
+      if (proxy != null) {
+        cmProxy.mayBeCloseProxy(proxy);
+      }
+    }
+  }
+
   @Override
   public void stopContainer(ContainerId containerId, NodeId nodeId)
       throws YarnException, IOException {
@@ -308,7 +337,7 @@ public class NMClientImpl extends NMClient {
   protected synchronized StartedContainer createStartedContainer(
       Container container) throws YarnException, IOException {
     StartedContainer startedContainer = new StartedContainer(container.getId(),
-        container.getNodeId(), container.getContainerToken());
+        container.getNodeId());
     return startedContainer;
   }
 

+ 87 - 6
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestNMClientAsync.java

@@ -36,6 +36,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicIntegerArray;
 
+import org.apache.hadoop.yarn.api.records.Resource;
 import org.junit.Assert;
 
 import org.apache.hadoop.conf.Configuration;
@@ -50,7 +51,6 @@ import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Token;
 import org.apache.hadoop.yarn.client.api.NMClient;
 import org.apache.hadoop.yarn.client.api.async.NMClientAsync;
-import org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.factories.RecordFactory;
@@ -116,6 +116,10 @@ public class TestNMClientAsync {
           recordFactory.newRecordInstance(ContainerLaunchContext.class);
       asyncClient.startContainerAsync(container, clc);
     }
+    while (!((TestCallbackHandler1) asyncClient.getCallbackHandler())
+        .isIncreaseResourceFailureCallsExecuted()) {
+      Thread.sleep(10);
+    }
     while (!((TestCallbackHandler1) asyncClient.getCallbackHandler())
         .isStopFailureCallsExecuted()) {
       Thread.sleep(10);
@@ -183,7 +187,7 @@ public class TestNMClientAsync {
   }
 
   private class TestCallbackHandler1
-      implements NMClientAsync.CallbackHandler {
+      extends NMClientAsync.AbstractCallbackHandler {
 
     private boolean path = true;
 
@@ -196,6 +200,10 @@ public class TestNMClientAsync {
     private AtomicInteger actualQueryFailure = new AtomicInteger(0);
     private AtomicInteger actualStopSuccess = new AtomicInteger(0);
     private AtomicInteger actualStopFailure = new AtomicInteger(0);
+    private AtomicInteger actualIncreaseResourceSuccess =
+        new AtomicInteger(0);
+    private AtomicInteger actualIncreaseResourceFailure =
+        new AtomicInteger(0);
 
     private AtomicIntegerArray actualStartSuccessArray;
     private AtomicIntegerArray actualStartFailureArray;
@@ -203,6 +211,8 @@ public class TestNMClientAsync {
     private AtomicIntegerArray actualQueryFailureArray;
     private AtomicIntegerArray actualStopSuccessArray;
     private AtomicIntegerArray actualStopFailureArray;
+    private AtomicIntegerArray actualIncreaseResourceSuccessArray;
+    private AtomicIntegerArray actualIncreaseResourceFailureArray;
 
     private Set<String> errorMsgs =
         Collections.synchronizedSet(new HashSet<String>());
@@ -217,6 +227,10 @@ public class TestNMClientAsync {
       actualQueryFailureArray = new AtomicIntegerArray(expectedFailure);
       actualStopSuccessArray = new AtomicIntegerArray(expectedSuccess);
       actualStopFailureArray = new AtomicIntegerArray(expectedFailure);
+      actualIncreaseResourceSuccessArray =
+          new AtomicIntegerArray(expectedSuccess);
+      actualIncreaseResourceFailureArray =
+          new AtomicIntegerArray(expectedFailure);
     }
 
     @SuppressWarnings("deprecation")
@@ -236,7 +250,11 @@ public class TestNMClientAsync {
         asyncClient.getContainerStatusAsync(containerId, nodeId);
       } else {
         // move on to the following failure tests
-        asyncClient.stopContainerAsync(containerId, nodeId);
+        // make sure we pass in the container with the same
+        // containerId
+        Container container = Container.newInstance(
+            containerId, nodeId, null, null, null, containerToken);
+        asyncClient.increaseContainerResourceAsync(container);
       }
 
       // Shouldn't crash the test thread
@@ -255,12 +273,33 @@ public class TestNMClientAsync {
       actualQuerySuccess.addAndGet(1);
       actualQuerySuccessArray.set(containerId.getId(), 1);
       // move on to the following success tests
-      asyncClient.stopContainerAsync(containerId, nodeId);
+      // make sure we pass in the container with the same
+      // containerId
+      Container container = Container.newInstance(
+          containerId, nodeId, null, null, null, containerToken);
+      asyncClient.increaseContainerResourceAsync(container);
 
       // Shouldn't crash the test thread
       throw new RuntimeException("Ignorable Exception");
     }
 
+    @SuppressWarnings("deprecation")
+    @Override
+    public void onContainerResourceIncreased(
+        ContainerId containerId, Resource resource) {
+      if (containerId.getId() >= expectedSuccess) {
+        errorMsgs.add("Container " + containerId +
+            " should throw the exception onContainerResourceIncreased");
+        return;
+      }
+      actualIncreaseResourceSuccess.addAndGet(1);
+      actualIncreaseResourceSuccessArray.set(containerId.getId(), 1);
+      // move on to the following success tests
+      asyncClient.stopContainerAsync(containerId, nodeId);
+      // throw a fake user exception, and shouldn't crash the test
+      throw new RuntimeException("Ignorable Exception");
+    }
+
     @SuppressWarnings("deprecation")
     @Override
     public void onContainerStopped(ContainerId containerId) {
@@ -300,6 +339,26 @@ public class TestNMClientAsync {
       throw new RuntimeException("Ignorable Exception");
     }
 
+    @SuppressWarnings("deprecation")
+    @Override
+    public void onIncreaseContainerResourceError(
+        ContainerId containerId, Throwable t) {
+      if (containerId.getId() < expectedSuccess + expectedFailure) {
+        errorMsgs.add("Container " + containerId +
+            " shouldn't throw the exception onIncreaseContainerResourceError");
+        return;
+      }
+      actualIncreaseResourceFailure.addAndGet(1);
+      actualIncreaseResourceFailureArray.set(
+          containerId.getId() - expectedSuccess - expectedFailure, 1);
+      // increase container resource error should NOT change the
+      // the container status to FAILED
+      // move on to the following failure tests
+      asyncClient.stopContainerAsync(containerId, nodeId);
+      // Shouldn't crash the test thread
+      throw new RuntimeException("Ignorable Exception");
+    }
+
     @SuppressWarnings("deprecation")
     @Override
     public void onStopContainerError(ContainerId containerId, Throwable t) {
@@ -345,10 +404,12 @@ public class TestNMClientAsync {
       boolean isAllSuccessCallsExecuted =
           actualStartSuccess.get() == expectedSuccess &&
           actualQuerySuccess.get() == expectedSuccess &&
+          actualIncreaseResourceSuccess.get() == expectedSuccess &&
           actualStopSuccess.get() == expectedSuccess;
       if (isAllSuccessCallsExecuted) {
         assertAtomicIntegerArray(actualStartSuccessArray);
         assertAtomicIntegerArray(actualQuerySuccessArray);
+        assertAtomicIntegerArray(actualIncreaseResourceSuccessArray);
         assertAtomicIntegerArray(actualStopSuccessArray);
       }
       return isAllSuccessCallsExecuted;
@@ -365,6 +426,15 @@ public class TestNMClientAsync {
       return isStartAndQueryFailureCallsExecuted;
     }
 
+    public boolean isIncreaseResourceFailureCallsExecuted() {
+      boolean isIncreaseResourceFailureCallsExecuted =
+          actualIncreaseResourceFailure.get() == expectedFailure;
+      if (isIncreaseResourceFailureCallsExecuted) {
+        assertAtomicIntegerArray(actualIncreaseResourceFailureArray);
+      }
+      return isIncreaseResourceFailureCallsExecuted;
+    }
+
     public boolean isStopFailureCallsExecuted() {
       boolean isStopFailureCallsExecuted =
           actualStopFailure.get() == expectedFailure;
@@ -392,6 +462,8 @@ public class TestNMClientAsync {
         when(client.getContainerStatus(any(ContainerId.class),
             any(NodeId.class))).thenReturn(
                 recordFactory.newRecordInstance(ContainerStatus.class));
+        doNothing().when(client).increaseContainerResource(
+            any(Container.class));
         doNothing().when(client).stopContainer(any(ContainerId.class),
             any(NodeId.class));
         break;
@@ -411,6 +483,8 @@ public class TestNMClientAsync {
         when(client.getContainerStatus(any(ContainerId.class),
             any(NodeId.class))).thenReturn(
                 recordFactory.newRecordInstance(ContainerStatus.class));
+        doThrow(RPCUtil.getRemoteException("Increase Resource Exception"))
+            .when(client).increaseContainerResource(any(Container.class));
         doThrow(RPCUtil.getRemoteException("Stop Exception")).when(client)
             .stopContainer(any(ContainerId.class), any(NodeId.class));
     }
@@ -493,7 +567,7 @@ public class TestNMClientAsync {
   }
 
   private class TestCallbackHandler2
-      implements NMClientAsync.CallbackHandler {
+      extends NMClientAsync.AbstractCallbackHandler {
     private CyclicBarrier barrierC;
     private AtomicBoolean exceptionOccurred = new AtomicBoolean(false);
 
@@ -511,6 +585,10 @@ public class TestNMClientAsync {
         ContainerStatus containerStatus) {
     }
 
+    @Override
+    public void onContainerResourceIncreased(
+        ContainerId containerId, Resource resource) {}
+
     @Override
     public void onContainerStopped(ContainerId containerId) {
     }
@@ -536,10 +614,13 @@ public class TestNMClientAsync {
         Throwable t) {
     }
 
+    @Override
+    public void onIncreaseContainerResourceError(
+        ContainerId containerId, Throwable t) {}
+
     @Override
     public void onStopContainerError(ContainerId containerId, Throwable t) {
     }
-
   }
 
   private Container mockContainer(int i) {

+ 29 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestNMClient.java

@@ -210,10 +210,10 @@ public class TestNMClient {
     testContainerManagement(nmClient, allocateContainers(rmClient, 5));
 
     rmClient.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED,
-                                         null, null);
+        null, null);
     // don't stop the running containers
     stopNmClient(false);
-    assertFalse(nmClient.startedContainers. isEmpty());
+    assertFalse(nmClient.startedContainers.isEmpty());
     //now cleanup
     nmClient.cleanupRunningContainers();
     assertEquals(0, nmClient.startedContainers.size());
@@ -298,6 +298,16 @@ public class TestNMClient {
             e.getMessage().contains("is not handled by this NodeManager"));
       }
 
+      // increaseContainerResource shouldn't be called before startContainer,
+      // otherwise, NodeManager cannot find the container
+      try {
+        nmClient.increaseContainerResource(container);
+        fail("Exception is expected");
+      } catch (YarnException e) {
+        assertTrue("The thrown exception is not expected",
+            e.getMessage().contains("is not handled by this NodeManager"));
+      }
+
       // stopContainer shouldn't be called before startContainer,
       // otherwise, an exception will be thrown
       try {
@@ -332,6 +342,8 @@ public class TestNMClient {
         // NodeManager may still need some time to make the container started
         testGetContainerStatus(container, i, ContainerState.RUNNING, "",
             Arrays.asList(new Integer[] {-1000}));
+        // Test increase container API and make sure requests can reach NM
+        testIncreaseContainerResource(container);
 
         try {
           nmClient.stopContainer(container.getId(), container.getNodeId());
@@ -397,4 +409,19 @@ public class TestNMClient {
     }
   }
 
+  private void testIncreaseContainerResource(Container container)
+    throws YarnException, IOException {
+    try {
+      nmClient.increaseContainerResource(container);
+    } catch (YarnException e) {
+      // NM container will only be in LOCALIZED state, so expect the increase
+      // action to fail.
+      if (!e.getMessage().contains(
+          "can only be changed when a container is in RUNNING state")) {
+        throw (AssertionError)
+            (new AssertionError("Exception is not expected: " + e)
+                .initCause(e));
+      }
+    }
+  }
 }