Browse Source

YARN-834. Fixed annotations for yarn-client module, reorganized packages and clearly differentiated *Async apis. Contributed by Arun C Murthy and Zhijie Shen.
svn merge --ignore-ancestry -c 1494017 ../../trunk/


git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2.1-beta@1494019 13f79535-47bb-0310-9956-ffa450edef68

Vinod Kumar Vavilapalli 12 năm trước cách đây
mục cha
commit
5acd4d6502
34 tập tin đã thay đổi với 945 bổ sung382 xóa
  1. 146 14
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ResourceMgrDelegate.java
  2. 7 4
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestResourceMgrDelegate.java
  3. 4 2
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestYARNRunner.java
  4. 4 2
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TestYarnClientProtocolProvider.java
  5. 4 0
      hadoop-yarn-project/CHANGES.txt
  6. 9 8
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
  7. 1 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java
  8. 1 2
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/src/main/java/org/apache/hadoop/yarn/applications/unmanagedamlauncher/UnmanagedAMLauncher.java
  9. 4 3
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java
  10. 3 2
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/NMClient.java
  11. 3 2
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/YarnClient.java
  12. 245 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java
  13. 235 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/NMClientAsync.java
  14. 12 93
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java
  15. 24 166
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/NMClientAsyncImpl.java
  16. 21 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/package-info.java
  17. 21 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/package-info.java
  18. 30 28
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java
  19. 15 6
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/NMClientImpl.java
  20. 14 5
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java
  21. 21 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/package-info.java
  22. 21 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/package-info.java
  23. 4 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/ApplicationCLI.java
  24. 4 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/NodeCLI.java
  25. 4 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/RMAdminCLI.java
  26. 5 2
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/YarnCLI.java
  27. 21 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/package-info.java
  28. 8 5
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestAMRMClientAsync.java
  29. 28 23
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestNMClientAsync.java
  30. 6 3
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java
  31. 5 4
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientContainerRequest.java
  32. 11 5
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestNMClient.java
  33. 3 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestYarnClient.java
  34. 1 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestYarnCLI.java

+ 146 - 14
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ResourceMgrDelegate.java

@@ -20,9 +20,12 @@ package org.apache.hadoop.mapred;
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
+import java.util.List;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
@@ -40,34 +43,85 @@ import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
 import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.NodeReport;
+import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
 import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
-import org.apache.hadoop.yarn.client.YarnClientImpl;
+import org.apache.hadoop.yarn.client.api.YarnClient;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.util.ProtoUtils;
 
-public class ResourceMgrDelegate extends YarnClientImpl {
+import com.google.common.annotations.VisibleForTesting;
+
+public class ResourceMgrDelegate extends YarnClient {
   private static final Log LOG = LogFactory.getLog(ResourceMgrDelegate.class);
       
   private YarnConfiguration conf;
   private GetNewApplicationResponse application;
   private ApplicationId applicationId;
+  @Private
+  @VisibleForTesting
+  protected YarnClient client;
+  private InetSocketAddress rmAddress;
 
   /**
-   * Delegate responsible for communicating with the Resource Manager's {@link ApplicationClientProtocol}.
+   * Delegate responsible for communicating with the Resource Manager's
+   * {@link ApplicationClientProtocol}.
    * @param conf the configuration object.
    */
   public ResourceMgrDelegate(YarnConfiguration conf) {
-    super();
+    this(conf, null);
+  }
+
+  /**
+   * Delegate responsible for communicating with the Resource Manager's
+   * {@link ApplicationClientProtocol}.
+   * @param conf the configuration object.
+   * @param rmAddress the address of the Resource Manager
+   */
+  public ResourceMgrDelegate(YarnConfiguration conf,
+      InetSocketAddress rmAddress) {
+    super(ResourceMgrDelegate.class.getName());
     this.conf = conf;
+    this.rmAddress = rmAddress;
+    if (rmAddress == null) {
+      client = YarnClient.createYarnClient();
+    } else {
+      client = YarnClient.createYarnClient(rmAddress);
+    }
     init(conf);
     start();
   }
 
+  @Override
+  protected void serviceInit(Configuration conf) throws Exception {
+    if (rmAddress == null) {
+      this.rmAddress = conf.getSocketAddr(YarnConfiguration.RM_ADDRESS,
+          YarnConfiguration.DEFAULT_RM_ADDRESS,
+          YarnConfiguration.DEFAULT_RM_PORT);
+    }
+    client.init(conf);
+    super.serviceInit(conf);
+  }
+
+  @Override
+  protected void serviceStart() throws Exception {
+    client.start();
+    super.serviceStart();
+  }
+
+  @Override
+  protected void serviceStop() throws Exception {
+    client.stop();
+    super.serviceStop();
+  }
+
   public TaskTrackerInfo[] getActiveTrackers() throws IOException,
       InterruptedException {
     try {
-      return TypeConverter.fromYarnNodes(super.getNodeReports());
+      return TypeConverter.fromYarnNodes(client.getNodeReports());
     } catch (YarnException e) {
       throw new IOException(e);
     }
@@ -75,7 +129,7 @@ public class ResourceMgrDelegate extends YarnClientImpl {
 
   public JobStatus[] getAllJobs() throws IOException, InterruptedException {
     try {
-      return TypeConverter.fromYarnApps(super.getApplicationList(), this.conf);
+      return TypeConverter.fromYarnApps(client.getApplicationList(), this.conf);
     } catch (YarnException e) {
       throw new IOException(e);
     }
@@ -91,7 +145,7 @@ public class ResourceMgrDelegate extends YarnClientImpl {
   public ClusterMetrics getClusterMetrics() throws IOException,
       InterruptedException {
     try {
-      YarnClusterMetrics metrics = super.getYarnClusterMetrics();
+      YarnClusterMetrics metrics = client.getYarnClusterMetrics();
       ClusterMetrics oldMetrics =
           new ClusterMetrics(1, 1, 1, 1, 1, 1,
               metrics.getNumNodeManagers() * 10,
@@ -112,7 +166,7 @@ public class ResourceMgrDelegate extends YarnClientImpl {
       InterruptedException {
     try {
       return ProtoUtils.convertFromProtoFormat(
-        super.getRMDelegationToken(renewer), rmAddress);
+          client.getRMDelegationToken(renewer), rmAddress);
     } catch (YarnException e) {
       throw new IOException(e);
     }
@@ -124,7 +178,7 @@ public class ResourceMgrDelegate extends YarnClientImpl {
 
   public JobID getNewJobID() throws IOException, InterruptedException {
     try {
-      this.application = super.getNewApplication();
+      this.application = client.getNewApplication();
       this.applicationId = this.application.getApplicationId();
       return TypeConverter.fromYarn(applicationId);
     } catch (YarnException e) {
@@ -136,7 +190,7 @@ public class ResourceMgrDelegate extends YarnClientImpl {
   InterruptedException {
     try {
       org.apache.hadoop.yarn.api.records.QueueInfo queueInfo =
-          super.getQueueInfo(queueName);
+          client.getQueueInfo(queueName);
       return (queueInfo == null) ? null : TypeConverter.fromYarn(queueInfo,
           conf);
     } catch (YarnException e) {
@@ -147,7 +201,7 @@ public class ResourceMgrDelegate extends YarnClientImpl {
   public QueueAclsInfo[] getQueueAclsForCurrentUser() throws IOException,
       InterruptedException {
     try {
-      return TypeConverter.fromYarnQueueUserAclsInfo(super
+      return TypeConverter.fromYarnQueueUserAclsInfo(client
         .getQueueAclsInfo());
     } catch (YarnException e) {
       throw new IOException(e);
@@ -156,7 +210,7 @@ public class ResourceMgrDelegate extends YarnClientImpl {
 
   public QueueInfo[] getQueues() throws IOException, InterruptedException {
     try {
-      return TypeConverter.fromYarnQueueInfo(super.getAllQueues(), this.conf);
+      return TypeConverter.fromYarnQueueInfo(client.getAllQueues(), this.conf);
     } catch (YarnException e) {
       throw new IOException(e);
     }
@@ -164,7 +218,7 @@ public class ResourceMgrDelegate extends YarnClientImpl {
 
   public QueueInfo[] getRootQueues() throws IOException, InterruptedException {
     try {
-      return TypeConverter.fromYarnQueueInfo(super.getRootQueueInfos(),
+      return TypeConverter.fromYarnQueueInfo(client.getRootQueueInfos(),
           this.conf);
     } catch (YarnException e) {
       throw new IOException(e);
@@ -174,7 +228,7 @@ public class ResourceMgrDelegate extends YarnClientImpl {
   public QueueInfo[] getChildQueues(String parent) throws IOException,
       InterruptedException {
     try {
-      return TypeConverter.fromYarnQueueInfo(super.getChildQueueInfos(parent),
+      return TypeConverter.fromYarnQueueInfo(client.getChildQueueInfos(parent),
         this.conf);
     } catch (YarnException e) {
       throw new IOException(e);
@@ -216,4 +270,82 @@ public class ResourceMgrDelegate extends YarnClientImpl {
   public ApplicationId getApplicationId() {
     return applicationId;
   }
+
+  @Override
+  public GetNewApplicationResponse getNewApplication() throws YarnException,
+      IOException {
+    return client.getNewApplication();
+  }
+
+  @Override
+  public ApplicationId
+      submitApplication(ApplicationSubmissionContext appContext)
+          throws YarnException, IOException {
+    return client.submitApplication(appContext);
+  }
+
+  @Override
+  public void killApplication(ApplicationId applicationId)
+      throws YarnException, IOException {
+    client.killApplication(applicationId);
+  }
+
+  @Override
+  public ApplicationReport getApplicationReport(ApplicationId appId)
+      throws YarnException, IOException {
+    return client.getApplicationReport(appId);
+  }
+
+  @Override
+  public List<ApplicationReport> getApplicationList() throws YarnException,
+      IOException {
+    return client.getApplicationList();
+  }
+
+  @Override
+  public YarnClusterMetrics getYarnClusterMetrics() throws YarnException,
+      IOException {
+    return client.getYarnClusterMetrics();
+  }
+
+  @Override
+  public List<NodeReport> getNodeReports() throws YarnException, IOException {
+    return client.getNodeReports();
+  }
+
+  @Override
+  public org.apache.hadoop.yarn.api.records.Token getRMDelegationToken(
+      Text renewer) throws YarnException, IOException {
+    return client.getRMDelegationToken(renewer);
+  }
+
+  @Override
+  public org.apache.hadoop.yarn.api.records.QueueInfo getQueueInfo(
+      String queueName) throws YarnException, IOException {
+    return client.getQueueInfo(queueName);
+  }
+
+  @Override
+  public List<org.apache.hadoop.yarn.api.records.QueueInfo> getAllQueues()
+      throws YarnException, IOException {
+    return client.getAllQueues();
+  }
+
+  @Override
+  public List<org.apache.hadoop.yarn.api.records.QueueInfo> getRootQueueInfos()
+      throws YarnException, IOException {
+    return client.getRootQueueInfos();
+  }
+
+  @Override
+  public List<org.apache.hadoop.yarn.api.records.QueueInfo> getChildQueueInfos(
+      String parent) throws YarnException, IOException {
+    return client.getChildQueueInfos(parent);
+  }
+
+  @Override
+  public List<QueueUserACLInfo> getQueueAclsInfo() throws YarnException,
+      IOException {
+    return client.getQueueAclsInfo();
+  }
 }

+ 7 - 4
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestResourceMgrDelegate.java

@@ -37,6 +37,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.client.api.impl.YarnClientImpl;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.util.Records;
@@ -67,8 +68,9 @@ public class TestResourceMgrDelegate {
     ResourceMgrDelegate delegate = new ResourceMgrDelegate(
       new YarnConfiguration()) {
       @Override
-      protected void serviceStart() {
-        this.rmClient = applicationsManager;
+      protected void serviceStart() throws Exception {
+        Assert.assertTrue(this.client instanceof YarnClientImpl);
+        ((YarnClientImpl) this.client).setRMClient(applicationsManager);
       }
     };
     delegate.getRootQueues();
@@ -110,8 +112,9 @@ public class TestResourceMgrDelegate {
     ResourceMgrDelegate resourceMgrDelegate = new ResourceMgrDelegate(
       new YarnConfiguration()) {
       @Override
-      protected void serviceStart() {
-        this.rmClient = applicationsManager;
+      protected void serviceStart() throws Exception {
+        Assert.assertTrue(this.client instanceof YarnClientImpl);
+        ((YarnClientImpl) this.client).setRMClient(applicationsManager);
       }
     };
     JobStatus[] allJobs = resourceMgrDelegate.getAllJobs();

+ 4 - 2
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestYARNRunner.java

@@ -82,6 +82,7 @@ import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
 import org.apache.hadoop.yarn.api.records.QueueInfo;
 import org.apache.hadoop.yarn.api.records.YarnApplicationState;
 import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
+import org.apache.hadoop.yarn.client.api.impl.YarnClientImpl;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
@@ -200,8 +201,9 @@ public class TestYARNRunner extends TestCase {
     final ApplicationClientProtocol clientRMProtocol = mock(ApplicationClientProtocol.class);
     ResourceMgrDelegate delegate = new ResourceMgrDelegate(conf) {
       @Override
-      protected void serviceStart() {
-        this.rmClient = clientRMProtocol;
+      protected void serviceStart() throws Exception {
+        assertTrue(this.client instanceof YarnClientImpl);
+        ((YarnClientImpl) this.client).setRMClient(clientRMProtocol);
       }
     };
     /* make sure kill calls finish application master */

+ 4 - 2
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TestYarnClientProtocolProvider.java

@@ -37,6 +37,7 @@ import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
 import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenResponse;
+import org.apache.hadoop.yarn.client.api.impl.YarnClientImpl;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
@@ -110,8 +111,9 @@ public class TestYarnClientProtocolProvider extends TestCase {
       ResourceMgrDelegate rmgrDelegate = new ResourceMgrDelegate(
           new YarnConfiguration(conf)) {
         @Override
-        protected void serviceStart() {
-          this.rmClient = cRMProtocol;
+        protected void serviceStart() throws Exception {
+          assertTrue(this.client instanceof YarnClientImpl);
+          ((YarnClientImpl) this.client).setRMClient(cRMProtocol);
         }
       };
       yrunner.setResourceMgrDelegate(rmgrDelegate);

+ 4 - 0
hadoop-yarn-project/CHANGES.txt

@@ -142,6 +142,10 @@ Release 2.1.0-beta - UNRELEASED
     YARN-610. ClientToken is no longer set in the environment of the Containers.
     (Omkar Vinit Joshi via vinodkv)
 
+    YARN-834. Fixed annotations for yarn-client module, reorganized packages and
+    clearly differentiated *Async apis. (Arun C Murthy and Zhijie Shen via
+    vinodkv)
+
   NEW FEATURES
 
     YARN-482. FS: Extend SchedulingMode to intermediate queues. 

+ 9 - 8
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java

@@ -67,9 +67,9 @@ import org.apache.hadoop.yarn.api.records.NodeReport;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
-import org.apache.hadoop.yarn.client.AMRMClient.ContainerRequest;
-import org.apache.hadoop.yarn.client.AMRMClientAsync;
-import org.apache.hadoop.yarn.client.NMClientAsync;
+import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
+import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
+import org.apache.hadoop.yarn.client.api.async.NMClientAsync;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.util.ConverterUtils;
@@ -436,17 +436,18 @@ public class ApplicationMaster {
    * @throws YarnException
    * @throws IOException
    */
-  @SuppressWarnings({ "rawtypes", "unchecked" })
+  @SuppressWarnings({ "unchecked" })
   public boolean run() throws YarnException, IOException {
     LOG.info("Starting ApplicationMaster");
 
     AMRMClientAsync.CallbackHandler allocListener = new RMCallbackHandler();
-    resourceManager = new AMRMClientAsync(appAttemptID, 1000, allocListener);
+    resourceManager = 
+        AMRMClientAsync.createAMRMClientAsync(appAttemptID, 1000, allocListener);
     resourceManager.init(conf);
     resourceManager.start();
 
     containerListener = new NMCallbackHandler();
-    nmClientAsync = new NMClientAsync(containerListener);
+    nmClientAsync = NMClientAsync.createNMClientAsync(containerListener);
     nmClientAsync.init(conf);
     nmClientAsync.start();
 
@@ -682,7 +683,7 @@ public class ApplicationMaster {
       }
       Container container = containers.get(containerId);
       if (container != null) {
-        nmClientAsync.getContainerStatus(containerId, container.getNodeId(),
+        nmClientAsync.getContainerStatusAsync(containerId, container.getNodeId(),
             container.getContainerToken());
       }
     }
@@ -802,7 +803,7 @@ public class ApplicationMaster {
       ctx.setCommands(commands);
 
       containerListener.addContainer(container.getId(), container);
-      nmClientAsync.startContainer(container, ctx);
+      nmClientAsync.startContainerAsync(container, ctx);
     }
   }
 

+ 1 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java

@@ -60,7 +60,7 @@ import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.YarnApplicationState;
 import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
-import org.apache.hadoop.yarn.client.YarnClient;
+import org.apache.hadoop.yarn.client.api.YarnClient;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.util.ConverterUtils;

+ 1 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/src/main/java/org/apache/hadoop/yarn/applications/unmanagedamlauncher/UnmanagedAMLauncher.java

@@ -48,8 +48,7 @@ import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.YarnApplicationState;
-import org.apache.hadoop.yarn.client.YarnClient;
-import org.apache.hadoop.yarn.client.YarnClientImpl;
+import org.apache.hadoop.yarn.client.api.YarnClient;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.util.Records;

+ 4 - 3
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClient.java → hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java

@@ -16,7 +16,7 @@
 * limitations under the License.
 */
 
-package org.apache.hadoop.yarn.client;
+package org.apache.hadoop.yarn.client.api;
 
 import java.io.IOException;
 import java.util.Collection;
@@ -36,12 +36,13 @@ import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.Token;
+import org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 
 import com.google.common.collect.ImmutableList;
 
 @InterfaceAudience.Public
-@InterfaceStability.Unstable
+@InterfaceStability.Stable
 public abstract class AMRMClient<T extends AMRMClient.ContainerRequest> extends
     AbstractService {
 
@@ -53,7 +54,7 @@ public abstract class AMRMClient<T extends AMRMClient.ContainerRequest> extends
    * AMRMClient.<T>createAMRMClientContainerRequest(appAttemptId)
    * }</pre>
    * @param appAttemptId the appAttemptId associated with the AMRMClient
-   * @return the newly created AMRMClient instance.
+   * @return the newly create AMRMClient instance.
    */
   @Public
   public static <T extends ContainerRequest> AMRMClient<T> createAMRMClient(

+ 3 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/NMClient.java → hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/NMClient.java

@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.hadoop.yarn.client;
+package org.apache.hadoop.yarn.client.api;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
@@ -33,10 +33,11 @@ import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Token;
+import org.apache.hadoop.yarn.client.api.impl.NMClientImpl;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 
 @InterfaceAudience.Public
-@InterfaceStability.Unstable
+@InterfaceStability.Stable
 public abstract class NMClient extends AbstractService {
 
   /**

+ 3 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/YarnClient.java → hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/YarnClient.java

@@ -16,7 +16,7 @@
 * limitations under the License.
 */
 
-package org.apache.hadoop.yarn.client;
+package org.apache.hadoop.yarn.client.api;
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
@@ -37,10 +37,11 @@ import org.apache.hadoop.yarn.api.records.QueueInfo;
 import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
 import org.apache.hadoop.yarn.api.records.Token;
 import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
+import org.apache.hadoop.yarn.client.api.impl.YarnClientImpl;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 
 @InterfaceAudience.Public
-@InterfaceStability.Evolving
+@InterfaceStability.Stable
 public abstract class YarnClient extends AbstractService {
 
   /**

+ 245 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java

@@ -0,0 +1,245 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.client.api.async;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Stable;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.NodeReport;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.Token;
+import org.apache.hadoop.yarn.client.api.AMRMClient;
+import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
+import org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl;
+import org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * <code>AMRMClientAsync</code> handles communication with the ResourceManager
+ * and provides asynchronous updates on events such as container allocations and
+ * completions.  It contains a thread that sends periodic heartbeats to the
+ * ResourceManager.
+ * 
+ * It should be used by implementing a CallbackHandler:
+ * <pre>
+ * {@code
+ * class MyCallbackHandler implements AMRMClientAsync.CallbackHandler {
+ *   public void onContainersAllocated(List<Container> containers) {
+ *     [run tasks on the containers]
+ *   }
+ *   
+ *   public void onContainersCompleted(List<ContainerStatus> statuses) {
+ *     [update progress, check whether app is done]
+ *   }
+ *   
+ *   public void onNodesUpdated(List<NodeReport> updated) {}
+ *   
+ *   public void onReboot() {}
+ * }
+ * }
+ * </pre>
+ * 
+ * The client's lifecycle should be managed similarly to the following:
+ * 
+ * <pre>
+ * {@code
+ * AMRMClientAsync asyncClient = 
+ *     createAMRMClientAsync(appAttId, 1000, new MyCallbackhandler());
+ * asyncClient.init(conf);
+ * asyncClient.start();
+ * RegisterApplicationMasterResponse response = asyncClient
+ *    .registerApplicationMaster(appMasterHostname, appMasterRpcPort,
+ *       appMasterTrackingUrl);
+ * asyncClient.addContainerRequest(containerRequest);
+ * [... wait for application to complete]
+ * asyncClient.unregisterApplicationMaster(status, appMsg, trackingUrl);
+ * asyncClient.stop();
+ * }
+ * </pre>
+ */
+@Public
+@Stable
+public abstract class AMRMClientAsync<T extends ContainerRequest> 
+extends AbstractService {
+  
+  protected final AMRMClient<T> client;
+  protected final CallbackHandler handler;
+  protected final AtomicInteger heartbeatIntervalMs = new AtomicInteger();
+
+  public static <T extends ContainerRequest> AMRMClientAsync<T> 
+  createAMRMClientAsync(
+      ApplicationAttemptId id, 
+      int intervalMs, 
+      CallbackHandler callbackHandler) {
+    return new AMRMClientAsyncImpl<T>(id, intervalMs, callbackHandler);
+  }
+  
+  public static <T extends ContainerRequest> AMRMClientAsync<T> 
+  createAMRMClientAsync(
+      AMRMClient<T> client, 
+      int intervalMs, 
+      CallbackHandler callbackHandler) {
+    return new AMRMClientAsyncImpl<T>(client, intervalMs, callbackHandler);
+  }
+  
+  protected AMRMClientAsync(ApplicationAttemptId id, int intervalMs,
+      CallbackHandler callbackHandler) {
+    this(new AMRMClientImpl<T>(id), intervalMs, callbackHandler);
+  }
+  
+  @Private
+  @VisibleForTesting
+  protected AMRMClientAsync(AMRMClient<T> client, int intervalMs,
+      CallbackHandler callbackHandler) {
+    super(AMRMClientAsync.class.getName());
+    this.client = client;
+    this.heartbeatIntervalMs.set(intervalMs);
+    this.handler = callbackHandler;
+  }
+    
+  public void setHeartbeatInterval(int interval) {
+    heartbeatIntervalMs.set(interval);
+  }
+  
+  public abstract List<? extends Collection<T>> getMatchingRequests(
+                                                   Priority priority, 
+                                                   String resourceName, 
+                                                   Resource capability);
+  
+  /**
+   * Registers this application master with the resource manager. On successful
+   * registration, starts the heartbeating thread.
+   * @throws YarnException
+   * @throws IOException
+   */
+  public abstract RegisterApplicationMasterResponse registerApplicationMaster(
+      String appHostName, int appHostPort, String appTrackingUrl)
+      throws YarnException, IOException;
+
+  /**
+   * Unregister the application master. This must be called in the end.
+   * @param appStatus Success/Failure status of the master
+   * @param appMessage Diagnostics message on failure
+   * @param appTrackingUrl New URL to get master info
+   * @throws YarnException
+   * @throws IOException
+   */
+  public abstract void unregisterApplicationMaster(
+      FinalApplicationStatus appStatus, String appMessage, String appTrackingUrl) 
+  throws YarnException, IOException;
+
+  /**
+   * Request containers for resources before calling <code>allocate</code>
+   * @param req Resource request
+   */
+  public abstract void addContainerRequest(T req);
+
+  /**
+   * Remove previous container request. The previous container request may have 
+   * already been sent to the ResourceManager. So even after the remove request 
+   * the app must be prepared to receive an allocation for the previous request 
+   * even after the remove request
+   * @param req Resource request
+   */
+  public abstract void removeContainerRequest(T req);
+
+  /**
+   * Release containers assigned by the Resource Manager. If the app cannot use
+   * the container or wants to give up the container then it can release them.
+   * The app needs to make new requests for the released resource capability if
+   * it still needs it. eg. it released non-local resources
+   * @param containerId
+   */
+  public abstract void releaseAssignedContainer(ContainerId containerId);
+
+  /**
+   * Get the currently available resources in the cluster.
+   * A valid value is available after a call to allocate has been made
+   * @return Currently available resources
+   */
+  public abstract Resource getClusterAvailableResources();
+
+  /**
+   * Get the current number of nodes in the cluster.
+   * A valid values is available after a call to allocate has been made
+   * @return Current number of nodes in the cluster
+   */
+  public abstract int getClusterNodeCount();
+
+  /**
+   * It returns the NMToken received on allocate call. It will not communicate
+   * with RM to get NMTokens. On allocate call whenever we receive new token
+   * along with new container AMRMClientAsync will cache this NMToken per node
+   * manager. This map returned should be shared with any application which is
+   * communicating with NodeManager (ex. NMClient / NMClientAsync) using
+   * NMTokens. If a new NMToken is received for the same node manager
+   * then it will be replaced. 
+   */
+  public abstract ConcurrentMap<String, Token> getNMTokens();
+  
+  public interface CallbackHandler {
+    
+    /**
+     * Called when the ResourceManager responds to a heartbeat with completed
+     * containers. If the response contains both completed containers and
+     * allocated containers, this will be called before containersAllocated.
+     */
+    public void onContainersCompleted(List<ContainerStatus> statuses);
+    
+    /**
+     * Called when the ResourceManager responds to a heartbeat with allocated
+     * containers. If the response containers both completed containers and
+     * allocated containers, this will be called after containersCompleted.
+     */
+    public void onContainersAllocated(List<Container> containers);
+    
+    /**
+     * Called when the ResourceManager wants the ApplicationMaster to shutdown
+     * for being out of sync etc. The ApplicationMaster should not unregister
+     * with the RM unless the ApplicationMaster wants to be the last attempt.
+     */
+    public void onShutdownRequest();
+    
+    /**
+     * Called when nodes tracked by the ResourceManager have changed in health,
+     * availability etc.
+     */
+    public void onNodesUpdated(List<NodeReport> updatedNodes);
+    
+    public float getProgress();
+    
+    public void onError(Exception e);
+  }
+}

+ 235 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/NMClientAsync.java

@@ -0,0 +1,235 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.client.api.async;
+
+import java.nio.ByteBuffer;
+import java.util.Map;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Stable;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Token;
+import org.apache.hadoop.yarn.client.api.NMClient;
+import org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl;
+import org.apache.hadoop.yarn.client.api.impl.NMClientImpl;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * <code>NMClientAsync</code> handles communication with all the NodeManagers
+ * and provides asynchronous updates on getting responses from them. It
+ * maintains a thread pool to communicate with individual NMs where a number of
+ * worker threads process requests to NMs by using {@link NMClientImpl}. The max
+ * size of the thread pool is configurable through
+ * {@link YarnConfiguration#NM_CLIENT_ASYNC_THREAD_POOL_MAX_SIZE}.
+ *
+ * It should be used in conjunction with a CallbackHandler. For example
+ *
+ * <pre>
+ * {@code
+ * class MyCallbackHandler implements NMClientAsync.CallbackHandler {
+ *   public void onContainerStarted(ContainerId containerId,
+ *       Map<String, ByteBuffer> allServiceResponse) {
+ *     [post process after the container is started, process the response]
+ *   }
+ *
+ *   public void onContainerStatusReceived(ContainerId containerId,
+ *       ContainerStatus containerStatus) {
+ *     [make use of the status of the container]
+ *   }
+ *
+ *   public void onContainerStopped(ContainerId containerId) {
+ *     [post process after the container is stopped]
+ *   }
+ *
+ *   public void onStartContainerError(
+ *       ContainerId containerId, Throwable t) {
+ *     [handle the raised exception]
+ *   }
+ *
+ *   public void onGetContainerStatusError(
+ *       ContainerId containerId, Throwable t) {
+ *     [handle the raised exception]
+ *   }
+ *
+ *   public void onStopContainerError(
+ *       ContainerId containerId, Throwable t) {
+ *     [handle the raised exception]
+ *   }
+ * }
+ * }
+ * </pre>
+ *
+ * The client's life-cycle should be managed like the following:
+ *
+ * <pre>
+ * {@code
+ * NMClientAsync asyncClient = 
+ *     NMClientAsync.createNMClientAsync(new MyCallbackhandler());
+ * asyncClient.init(conf);
+ * asyncClient.start();
+ * asyncClient.startContainer(container, containerLaunchContext);
+ * [... wait for container being started]
+ * asyncClient.getContainerStatus(container.getId(), container.getNodeId(),
+ *     container.getContainerToken());
+ * [... handle the status in the callback instance]
+ * asyncClient.stopContainer(container.getId(), container.getNodeId(),
+ *     container.getContainerToken());
+ * [... wait for container being stopped]
+ * asyncClient.stop();
+ * }
+ * </pre>
+ */
+@Public
+@Stable
+public abstract class NMClientAsync extends AbstractService {
+
+  protected NMClient client;
+  protected CallbackHandler callbackHandler;
+
+  public static NMClientAsync createNMClientAsync(CallbackHandler callbackHandler) {
+    return new NMClientAsyncImpl(callbackHandler);
+  }
+  
+  protected NMClientAsync(CallbackHandler callbackHandler) {
+    this (NMClientAsync.class.getName(), callbackHandler);
+  }
+
+  protected NMClientAsync(String name, CallbackHandler callbackHandler) {
+    this (name, new NMClientImpl(), callbackHandler);
+  }
+
+  @Private
+  @VisibleForTesting
+  protected NMClientAsync(String name, NMClient client,
+      CallbackHandler callbackHandler) {
+    super(name);
+    this.setClient(client);
+    this.setCallbackHandler(callbackHandler);
+  }
+
+  public abstract void startContainerAsync(
+      Container container, ContainerLaunchContext containerLaunchContext);
+
+  public abstract void stopContainerAsync(
+      ContainerId containerId, NodeId nodeId, Token containerToken);
+
+  public abstract void getContainerStatusAsync(
+      ContainerId containerId, NodeId nodeId, Token containerToken);
+  
+  public NMClient getClient() {
+    return client;
+  }
+
+  public void setClient(NMClient client) {
+    this.client = client;
+  }
+
+  public CallbackHandler getCallbackHandler() {
+    return callbackHandler;
+  }
+
+  public void setCallbackHandler(CallbackHandler callbackHandler) {
+    this.callbackHandler = callbackHandler;
+  }
+
+  /**
+   * <p>
+   * The callback interface needs to be implemented by {@link NMClientAsync}
+   * users. The APIs are called when responses from <code>NodeManager</code> are
+   * available.
+   * </p>
+   *
+   * <p>
+   * Once a callback happens, the users can chose to act on it in blocking or
+   * non-blocking manner. If the action on callback is done in a blocking
+   * manner, some of the threads performing requests on NodeManagers may get
+   * blocked depending on how many threads in the pool are busy.
+   * </p>
+   *
+   * <p>
+   * The implementation of the callback function should not throw the
+   * unexpected exception. Otherwise, {@link NMClientAsync} will just
+   * catch, log and then ignore it.
+   * </p>
+   */
+  public static interface CallbackHandler {
+    /**
+     * The API is called when <code>NodeManager</code> responds to indicate its
+     * acceptance of the starting container request
+     * @param containerId the Id of the container
+     * @param allServiceResponse a Map between the auxiliary service names and
+     *                           their outputs
+     */
+    void onContainerStarted(ContainerId containerId,
+        Map<String, ByteBuffer> allServiceResponse);
+
+    /**
+     * The API is called when <code>NodeManager</code> responds with the status
+     * of the container
+     * @param containerId the Id of the container
+     * @param containerStatus the status of the container
+     */
+    void onContainerStatusReceived(ContainerId containerId,
+        ContainerStatus containerStatus);
+
+    /**
+     * The API is called when <code>NodeManager</code> responds to indicate the
+     * container is stopped.
+     * @param containerId the Id of the container
+     */
+    void onContainerStopped(ContainerId containerId);
+
+    /**
+     * The API is called when an exception is raised in the process of
+     * starting a container
+     *
+     * @param containerId the Id of the container
+     * @param t the raised exception
+     */
+    void onStartContainerError(ContainerId containerId, Throwable t);
+
+    /**
+     * The API is called when an exception is raised in the process of
+     * querying the status of a container
+     *
+     * @param containerId the Id of the container
+     * @param t the raised exception
+     */
+    void onGetContainerStatusError(ContainerId containerId, Throwable t);
+
+    /**
+     * The API is called when an exception is raised in the process of
+     * stopping a container
+     *
+     * @param containerId the Id of the container
+     * @param t the raised exception
+     */
+    void onStopContainerError(ContainerId containerId, Throwable t);
+
+  }
+
+}

+ 12 - 93
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClientAsync.java → hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java

@@ -16,7 +16,7 @@
 * limitations under the License.
 */
 
-package org.apache.hadoop.yarn.client;
+package org.apache.hadoop.yarn.client.api.async.impl;
 
 import java.io.IOException;
 import java.util.Collection;
@@ -24,15 +24,12 @@ import java.util.List;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
-import org.apache.hadoop.classification.InterfaceStability.Evolving;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.service.AbstractService;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
@@ -44,65 +41,24 @@ import org.apache.hadoop.yarn.api.records.NodeReport;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.Token;
-import org.apache.hadoop.yarn.client.AMRMClient.ContainerRequest;
+import org.apache.hadoop.yarn.client.api.AMRMClient;
+import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
+import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
+import org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 
 import com.google.common.annotations.VisibleForTesting;
 
-/**
- * <code>AMRMClientAsync</code> handles communication with the ResourceManager
- * and provides asynchronous updates on events such as container allocations and
- * completions.  It contains a thread that sends periodic heartbeats to the
- * ResourceManager.
- * 
- * It should be used by implementing a CallbackHandler:
- * <pre>
- * {@code
- * class MyCallbackHandler implements AMRMClientAsync.CallbackHandler {
- *   public void onContainersAllocated(List<Container> containers) {
- *     [run tasks on the containers]
- *   }
- *   
- *   public void onContainersCompleted(List<ContainerStatus> statuses) {
- *     [update progress, check whether app is done]
- *   }
- *   
- *   public void onNodesUpdated(List<NodeReport> updated) {}
- *   
- *   public void onReboot() {}
- * }
- * }
- * </pre>
- * 
- * The client's lifecycle should be managed similarly to the following:
- * 
- * <pre>
- * {@code
- * AMRMClientAsync asyncClient = new AMRMClientAsync(appAttId, 1000, new MyCallbackhandler());
- * asyncClient.init(conf);
- * asyncClient.start();
- * RegisterApplicationMasterResponse response = asyncClient
- *    .registerApplicationMaster(appMasterHostname, appMasterRpcPort,
- *       appMasterTrackingUrl);
- * asyncClient.addContainerRequest(containerRequest);
- * [... wait for application to complete]
- * asyncClient.unregisterApplicationMaster(status, appMsg, trackingUrl);
- * asyncClient.stop();
- * }
- * </pre>
- */
+@Private
 @Unstable
-@Evolving
-public class AMRMClientAsync<T extends ContainerRequest> extends AbstractService {
+public class AMRMClientAsyncImpl<T extends ContainerRequest> 
+extends AMRMClientAsync<T> {
   
-  private static final Log LOG = LogFactory.getLog(AMRMClientAsync.class);
+  private static final Log LOG = LogFactory.getLog(AMRMClientAsyncImpl.class);
   
-  private final AMRMClient<T> client;
-  private final AtomicInteger heartbeatIntervalMs = new AtomicInteger();
   private final HeartbeatThread heartbeatThread;
   private final CallbackHandlerThread handlerThread;
-  private final CallbackHandler handler;
 
   private final BlockingQueue<AllocateResponse> responseQueue;
   
@@ -113,19 +69,16 @@ public class AMRMClientAsync<T extends ContainerRequest> extends AbstractService
   
   private volatile Exception savedException;
   
-  public AMRMClientAsync(ApplicationAttemptId id, int intervalMs,
+  public AMRMClientAsyncImpl(ApplicationAttemptId id, int intervalMs,
       CallbackHandler callbackHandler) {
     this(new AMRMClientImpl<T>(id), intervalMs, callbackHandler);
   }
   
   @Private
   @VisibleForTesting
-  protected AMRMClientAsync(AMRMClient<T> client, int intervalMs,
+  public AMRMClientAsyncImpl(AMRMClient<T> client, int intervalMs,
       CallbackHandler callbackHandler) {
-    super(AMRMClientAsync.class.getName());
-    this.client = client;
-    this.heartbeatIntervalMs.set(intervalMs);
-    handler = callbackHandler;
+    super(client, intervalMs, callbackHandler);
     heartbeatThread = new HeartbeatThread();
     handlerThread = new CallbackHandlerThread();
     responseQueue = new LinkedBlockingQueue<AllocateResponse>();
@@ -386,38 +339,4 @@ public class AMRMClientAsync<T extends ContainerRequest> extends AbstractService
       }
     }
   }
-  
-  public interface CallbackHandler {
-    
-    /**
-     * Called when the ResourceManager responds to a heartbeat with completed
-     * containers. If the response contains both completed containers and
-     * allocated containers, this will be called before containersAllocated.
-     */
-    public void onContainersCompleted(List<ContainerStatus> statuses);
-    
-    /**
-     * Called when the ResourceManager responds to a heartbeat with allocated
-     * containers. If the response containers both completed containers and
-     * allocated containers, this will be called after containersCompleted.
-     */
-    public void onContainersAllocated(List<Container> containers);
-    
-    /**
-     * Called when the ResourceManager wants the ApplicationMaster to shutdown
-     * for being out of sync etc. The ApplicationMaster should not unregister
-     * with the RM unless the ApplicationMaster wants to be the last attempt.
-     */
-    public void onShutdownRequest();
-    
-    /**
-     * Called when nodes tracked by the ResourceManager have changed in health,
-     * availability etc.
-     */
-    public void onNodesUpdated(List<NodeReport> updatedNodes);
-    
-    public float getProgress();
-    
-    public void onError(Exception e);
-  }
 }

+ 24 - 166
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/NMClientAsync.java → hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/NMClientAsyncImpl.java

@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.hadoop.yarn.client;
+package org.apache.hadoop.yarn.client.api.async.impl;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
@@ -39,16 +39,17 @@ import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
-import org.apache.hadoop.classification.InterfaceStability.Evolving;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.service.AbstractService;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
-import org.apache.hadoop.yarn.api.records.Token;
 import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Token;
+import org.apache.hadoop.yarn.client.api.NMClient;
+import org.apache.hadoop.yarn.client.api.async.NMClientAsync;
+import org.apache.hadoop.yarn.client.api.impl.NMClientImpl;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.AbstractEvent;
 import org.apache.hadoop.yarn.event.EventHandler;
@@ -63,75 +64,11 @@ import org.apache.hadoop.yarn.state.StateMachineFactory;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 
-/**
- * <code>NMClientAsync</code> handles communication with all the NodeManagers
- * and provides asynchronous updates on getting responses from them. It
- * maintains a thread pool to communicate with individual NMs where a number of
- * worker threads process requests to NMs by using {@link NMClientImpl}. The max
- * size of the thread pool is configurable through
- * {@link YarnConfiguration#NM_CLIENT_ASYNC_THREAD_POOL_MAX_SIZE}.
- *
- * It should be used in conjunction with a CallbackHandler. For example
- *
- * <pre>
- * {@code
- * class MyCallbackHandler implements NMClientAsync.CallbackHandler {
- *   public void onContainerStarted(ContainerId containerId,
- *       Map<String, ByteBuffer> allServiceResponse) {
- *     [post process after the container is started, process the response]
- *   }
- *
- *   public void onContainerStatusReceived(ContainerId containerId,
- *       ContainerStatus containerStatus) {
- *     [make use of the status of the container]
- *   }
- *
- *   public void onContainerStopped(ContainerId containerId) {
- *     [post process after the container is stopped]
- *   }
- *
- *   public void onStartContainerError(
- *       ContainerId containerId, Throwable t) {
- *     [handle the raised exception]
- *   }
- *
- *   public void onGetContainerStatusError(
- *       ContainerId containerId, Throwable t) {
- *     [handle the raised exception]
- *   }
- *
- *   public void onStopContainerError(
- *       ContainerId containerId, Throwable t) {
- *     [handle the raised exception]
- *   }
- * }
- * }
- * </pre>
- *
- * The client's life-cycle should be managed like the following:
- *
- * <pre>
- * {@code
- * NMClientAsync asyncClient = new NMClientAsync(new MyCallbackhandler());
- * asyncClient.init(conf);
- * asyncClient.start();
- * asyncClient.startContainer(container, containerLaunchContext);
- * [... wait for container being started]
- * asyncClient.getContainerStatus(container.getId(), container.getNodeId(),
- *     container.getContainerToken());
- * [... handle the status in the callback instance]
- * asyncClient.stopContainer(container.getId(), container.getNodeId(),
- *     container.getContainerToken());
- * [... wait for container being stopped]
- * asyncClient.stop();
- * }
- * </pre>
- */
+@Private
 @Unstable
-@Evolving
-public class NMClientAsync extends AbstractService {
+public class NMClientAsyncImpl extends NMClientAsync {
 
-  private static final Log LOG = LogFactory.getLog(NMClientAsync.class);
+  private static final Log LOG = LogFactory.getLog(NMClientAsyncImpl.class);
 
   protected static final int INITIAL_THREAD_POOL_SIZE = 10;
 
@@ -142,25 +79,22 @@ public class NMClientAsync extends AbstractService {
   protected BlockingQueue<ContainerEvent> events =
       new LinkedBlockingQueue<ContainerEvent>();
 
-  protected NMClient client;
-  protected CallbackHandler callbackHandler;
-
   protected ConcurrentMap<ContainerId, StatefulContainer> containers =
       new ConcurrentHashMap<ContainerId, StatefulContainer>();
 
-  public NMClientAsync(CallbackHandler callbackHandler) {
-    this (NMClientAsync.class.getName(), callbackHandler);
+  public NMClientAsyncImpl(CallbackHandler callbackHandler) {
+    this (NMClientAsyncImpl.class.getName(), callbackHandler);
   }
 
-  public NMClientAsync(String name, CallbackHandler callbackHandler) {
+  public NMClientAsyncImpl(String name, CallbackHandler callbackHandler) {
     this (name, new NMClientImpl(), callbackHandler);
   }
 
   @Private
   @VisibleForTesting
-  protected NMClientAsync(String name, NMClient client,
+  protected NMClientAsyncImpl(String name, NMClient client,
       CallbackHandler callbackHandler) {
-    super(name);
+    super(name, client, callbackHandler);
     this.client = client;
     this.callbackHandler = callbackHandler;
   }
@@ -268,7 +202,7 @@ public class NMClientAsync extends AbstractService {
       // If NMClientImpl doesn't stop running containers, the states doesn't
       // need to be cleared.
       if (!(client instanceof NMClientImpl) ||
-          ((NMClientImpl) client).cleanupRunningContainers.get()) {
+          ((NMClientImpl) client).getCleanupRunningContainers().get()) {
         if (containers != null) {
           containers.clear();
         }
@@ -278,7 +212,7 @@ public class NMClientAsync extends AbstractService {
     super.serviceStop();
   }
 
-  public void startContainer(
+  public void startContainerAsync(
       Container container, ContainerLaunchContext containerLaunchContext) {
     if (containers.putIfAbsent(container.getId(),
         new StatefulContainer(this, container.getId())) != null) {
@@ -295,7 +229,7 @@ public class NMClientAsync extends AbstractService {
     }
   }
 
-  public void stopContainer(ContainerId containerId, NodeId nodeId,
+  public void stopContainerAsync(ContainerId containerId, NodeId nodeId,
       Token containerToken) {
     if (containers.get(containerId) == null) {
       callbackHandler.onStopContainerError(containerId,
@@ -312,7 +246,7 @@ public class NMClientAsync extends AbstractService {
     }
   }
 
-  public void getContainerStatus(ContainerId containerId, NodeId nodeId,
+  public void getContainerStatusAsync(ContainerId containerId, NodeId nodeId,
       Token containerToken) {
     try {
       events.put(new ContainerEvent(containerId, nodeId, containerToken,
@@ -443,10 +377,10 @@ public class NMClientAsync extends AbstractService {
           }
           assert scEvent != null;
           Map<String, ByteBuffer> allServiceResponse =
-              container.nmClientAsync.client.startContainer(
+              container.nmClientAsync.getClient().startContainer(
                   scEvent.getContainer(), scEvent.getContainerLaunchContext());
           try {
-            container.nmClientAsync.callbackHandler.onContainerStarted(
+            container.nmClientAsync.getCallbackHandler().onContainerStarted(
                 containerId, allServiceResponse);
           } catch (Throwable thr) {
             // Don't process user created unchecked exception
@@ -466,7 +400,7 @@ public class NMClientAsync extends AbstractService {
       private ContainerState onExceptionRaised(StatefulContainer container,
           ContainerEvent event, Throwable t) {
         try {
-          container.nmClientAsync.callbackHandler.onStartContainerError(
+          container.nmClientAsync.getCallbackHandler().onStartContainerError(
               event.getContainerId(), t);
         } catch (Throwable thr) {
           // Don't process user created unchecked exception
@@ -487,10 +421,10 @@ public class NMClientAsync extends AbstractService {
           StatefulContainer container, ContainerEvent event) {
         ContainerId containerId = event.getContainerId();
         try {
-          container.nmClientAsync.client.stopContainer(
+          container.nmClientAsync.getClient().stopContainer(
               containerId, event.getNodeId(), event.getContainerToken());
           try {
-            container.nmClientAsync.callbackHandler.onContainerStopped(
+            container.nmClientAsync.getCallbackHandler().onContainerStopped(
                 event.getContainerId());
           } catch (Throwable thr) {
             // Don't process user created unchecked exception
@@ -510,7 +444,7 @@ public class NMClientAsync extends AbstractService {
       private ContainerState onExceptionRaised(StatefulContainer container,
           ContainerEvent event, Throwable t) {
         try {
-          container.nmClientAsync.callbackHandler.onStopContainerError(
+          container.nmClientAsync.getCallbackHandler().onStopContainerError(
               event.getContainerId(), t);
         } catch (Throwable thr) {
           // Don't process user created unchecked exception
@@ -530,7 +464,7 @@ public class NMClientAsync extends AbstractService {
       @Override
       public void transition(StatefulContainer container, ContainerEvent event) {
         try {
-          container.nmClientAsync.callbackHandler.onStartContainerError(
+          container.nmClientAsync.getCallbackHandler().onStartContainerError(
               event.getContainerId(),
               RPCUtil.getRemoteException(STOP_BEFORE_START_ERROR_MSG));
         } catch (Throwable thr) {
@@ -641,80 +575,4 @@ public class NMClientAsync extends AbstractService {
     }
   }
 
-  /**
-   * <p>
-   * The callback interface needs to be implemented by {@link NMClientAsync}
-   * users. The APIs are called when responses from <code>NodeManager</code> are
-   * available.
-   * </p>
-   *
-   * <p>
-   * Once a callback happens, the users can chose to act on it in blocking or
-   * non-blocking manner. If the action on callback is done in a blocking
-   * manner, some of the threads performing requests on NodeManagers may get
-   * blocked depending on how many threads in the pool are busy.
-   * </p>
-   *
-   * <p>
-   * The implementation of the callback function should not throw the
-   * unexpected exception. Otherwise, {@link NMClientAsync} will just
-   * catch, log and then ignore it.
-   * </p>
-   */
-  public static interface CallbackHandler {
-    /**
-     * The API is called when <code>NodeManager</code> responds to indicate its
-     * acceptance of the starting container request
-     * @param containerId the Id of the container
-     * @param allServiceResponse a Map between the auxiliary service names and
-     *                           their outputs
-     */
-    void onContainerStarted(ContainerId containerId,
-        Map<String, ByteBuffer> allServiceResponse);
-
-    /**
-     * The API is called when <code>NodeManager</code> responds with the status
-     * of the container
-     * @param containerId the Id of the container
-     * @param containerStatus the status of the container
-     */
-    void onContainerStatusReceived(ContainerId containerId,
-        ContainerStatus containerStatus);
-
-    /**
-     * The API is called when <code>NodeManager</code> responds to indicate the
-     * container is stopped.
-     * @param containerId the Id of the container
-     */
-    void onContainerStopped(ContainerId containerId);
-
-    /**
-     * The API is called when an exception is raised in the process of
-     * starting a container
-     *
-     * @param containerId the Id of the container
-     * @param t the raised exception
-     */
-    void onStartContainerError(ContainerId containerId, Throwable t);
-
-    /**
-     * The API is called when an exception is raised in the process of
-     * querying the status of a container
-     *
-     * @param containerId the Id of the container
-     * @param t the raised exception
-     */
-    void onGetContainerStatusError(ContainerId containerId, Throwable t);
-
-    /**
-     * The API is called when an exception is raised in the process of
-     * stopping a container
-     *
-     * @param containerId the Id of the container
-     * @param t the raised exception
-     */
-    void onStopContainerError(ContainerId containerId, Throwable t);
-
-  }
-
 }

+ 21 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/package-info.java

@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+@InterfaceAudience.Public
+package org.apache.hadoop.yarn.client.api.async.impl;
+import org.apache.hadoop.classification.InterfaceAudience;
+

+ 21 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/package-info.java

@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+@InterfaceAudience.Public
+package org.apache.hadoop.yarn.client.api.async;
+import org.apache.hadoop.classification.InterfaceAudience;
+

+ 30 - 28
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClientImpl.java → hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java

@@ -16,7 +16,7 @@
 * limitations under the License.
 */
 
-package org.apache.hadoop.yarn.client;
+package org.apache.hadoop.yarn.client.api.impl;
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
@@ -57,7 +57,8 @@ import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.api.records.Token;
-import org.apache.hadoop.yarn.client.AMRMClient.ContainerRequest;
+import org.apache.hadoop.yarn.client.api.AMRMClient;
+import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
@@ -71,6 +72,7 @@ import com.google.common.base.Joiner;
 
 // TODO check inputs for null etc. YARN-654
 
+@Private
 @Unstable
 public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
 
@@ -312,64 +314,64 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> {
   @Override
   public synchronized void addContainerRequest(T req) {
     Set<String> allRacks = new HashSet<String>();
-    if (req.racks != null) {
-      allRacks.addAll(req.racks);
-      if(req.racks.size() != allRacks.size()) {
+    if (req.getRacks() != null) {
+      allRacks.addAll(req.getRacks());
+      if(req.getRacks().size() != allRacks.size()) {
         Joiner joiner = Joiner.on(',');
         LOG.warn("ContainerRequest has duplicate racks: "
-            + joiner.join(req.racks));
+            + joiner.join(req.getRacks()));
       }
     }
-    allRacks.addAll(resolveRacks(req.nodes));
+    allRacks.addAll(resolveRacks(req.getNodes()));
     
-    if (req.nodes != null) {
-      HashSet<String> dedupedNodes = new HashSet<String>(req.nodes);
-      if(dedupedNodes.size() != req.nodes.size()) {
+    if (req.getNodes() != null) {
+      HashSet<String> dedupedNodes = new HashSet<String>(req.getNodes());
+      if(dedupedNodes.size() != req.getNodes().size()) {
         Joiner joiner = Joiner.on(',');
         LOG.warn("ContainerRequest has duplicate nodes: "
-            + joiner.join(req.nodes));        
+            + joiner.join(req.getNodes()));        
       }
       for (String node : dedupedNodes) {
         // Ensure node requests are accompanied by requests for
         // corresponding rack
-        addResourceRequest(req.priority, node, req.capability,
-            req.containerCount, req);
+        addResourceRequest(req.getPriority(), node, req.getCapability(),
+            req.getContainerCount(), req);
       }
     }
 
     for (String rack : allRacks) {
-      addResourceRequest(req.priority, rack, req.capability,
-          req.containerCount, req);
+      addResourceRequest(req.getPriority(), rack, req.getCapability(),
+          req.getContainerCount(), req);
     }
 
     // Off-switch
-    addResourceRequest(req.priority, ResourceRequest.ANY, req.capability,
-        req.containerCount, req);
+    addResourceRequest(req.getPriority(), ResourceRequest.ANY, req.getCapability(),
+        req.getContainerCount(), req);
   }
 
   @Override
   public synchronized void removeContainerRequest(T req) {
     Set<String> allRacks = new HashSet<String>();
-    if (req.racks != null) {
-      allRacks.addAll(req.racks);
+    if (req.getRacks() != null) {
+      allRacks.addAll(req.getRacks());
     }
-    allRacks.addAll(resolveRacks(req.nodes));
+    allRacks.addAll(resolveRacks(req.getNodes()));
 
     // Update resource requests
-    if (req.nodes != null) {
-      for (String node : new HashSet<String>(req.nodes)) {
-        decResourceRequest(req.priority, node, req.capability,
-            req.containerCount, req);
+    if (req.getNodes() != null) {
+      for (String node : new HashSet<String>(req.getNodes())) {
+        decResourceRequest(req.getPriority(), node, req.getCapability(),
+            req.getContainerCount(), req);
       }
     }
 
     for (String rack : allRacks) {
-      decResourceRequest(req.priority, rack, req.capability,
-          req.containerCount, req);
+      decResourceRequest(req.getPriority(), rack, req.getCapability(),
+          req.getContainerCount(), req);
     }
 
-    decResourceRequest(req.priority, ResourceRequest.ANY, req.capability,
-        req.containerCount, req);
+    decResourceRequest(req.getPriority(), ResourceRequest.ANY, req.getCapability(),
+        req.getContainerCount(), req);
   }
 
   @Override

+ 15 - 6
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/NMClientImpl.java → hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/NMClientImpl.java

@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.hadoop.yarn.client;
+package org.apache.hadoop.yarn.client.api.impl;
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
@@ -29,6 +29,8 @@ import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -45,6 +47,7 @@ import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Token;
+import org.apache.hadoop.yarn.client.api.NMClient;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.ipc.RPCUtil;
 import org.apache.hadoop.yarn.ipc.YarnRPC;
@@ -76,6 +79,8 @@ import org.apache.hadoop.yarn.util.Records;
  * {@link #stopContainer}.
  * </p>
  */
+@Private
+@Unstable
 public class NMClientImpl extends NMClient {
 
   private static final Log LOG = LogFactory.getLog(NMClientImpl.class);
@@ -86,7 +91,7 @@ public class NMClientImpl extends NMClient {
       new ConcurrentHashMap<ContainerId, StartedContainer>();
 
   //enabled by default
-  protected final AtomicBoolean cleanupRunningContainers = new AtomicBoolean(true);
+  private final AtomicBoolean cleanupRunningContainers = new AtomicBoolean(true);
 
   public NMClientImpl() {
     super(NMClientImpl.class.getName());
@@ -100,7 +105,7 @@ public class NMClientImpl extends NMClient {
   protected void serviceStop() throws Exception {
     // Usually, started-containers are stopped when this client stops. Unless
     // the flag cleanupRunningContainers is set to false.
-    if (cleanupRunningContainers.get()) {
+    if (getCleanupRunningContainers().get()) {
       cleanupRunningContainers();
     }
     super.serviceStop();
@@ -126,7 +131,7 @@ public class NMClientImpl extends NMClient {
 
   @Override
   public void cleanupRunningContainersOnStop(boolean enabled) {
-    cleanupRunningContainers.set(enabled);
+    getCleanupRunningContainers().set(enabled);
   }
 
   protected static class StartedContainer {
@@ -171,7 +176,7 @@ public class NMClientImpl extends NMClient {
     }
 
     @Override
-    protected void serviceStart() throws Exception {
+    protected synchronized void serviceStart() throws Exception {
       final YarnRPC rpc = YarnRPC.create(getConfig());
 
       final InetSocketAddress containerAddress =
@@ -199,7 +204,7 @@ public class NMClientImpl extends NMClient {
     }
 
     @Override
-    protected void serviceStop() throws Exception {
+    protected synchronized void serviceStop() throws Exception {
       if (this.containerManager != null) {
         RPC.stopProxy(this.containerManager);
 
@@ -397,4 +402,8 @@ public class NMClientImpl extends NMClient {
     return startedContainers.get(containerId);
   }
 
+  public AtomicBoolean getCleanupRunningContainers() {
+    return cleanupRunningContainers;
+  }
+
 }

+ 14 - 5
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/YarnClientImpl.java → hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java

@@ -16,7 +16,7 @@
 * limitations under the License.
 */
 
-package org.apache.hadoop.yarn.client;
+package org.apache.hadoop.yarn.client.api.impl;
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
@@ -25,8 +25,8 @@ import java.util.List;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.ipc.RPC;
@@ -56,13 +56,16 @@ import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
 import org.apache.hadoop.yarn.api.records.Token;
 import org.apache.hadoop.yarn.api.records.YarnApplicationState;
 import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
+import org.apache.hadoop.yarn.client.api.YarnClient;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.ipc.YarnRPC;
 import org.apache.hadoop.yarn.util.Records;
 
-@InterfaceAudience.Public
-@InterfaceStability.Evolving
+import com.google.common.annotations.VisibleForTesting;
+
+@Private
+@Unstable
 public class YarnClientImpl extends YarnClient {
 
   private static final Log LOG = LogFactory.getLog(YarnClientImpl.class);
@@ -304,4 +307,10 @@ public class YarnClientImpl extends YarnClient {
       }
     }
   }
+
+  @Private
+  @VisibleForTesting
+  public void setRMClient(ApplicationClientProtocol rmClient) {
+    this.rmClient = rmClient;
+  }
 }

+ 21 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/package-info.java

@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+@InterfaceAudience.Public
+package org.apache.hadoop.yarn.client.api.impl;
+import org.apache.hadoop.classification.InterfaceAudience;
+

+ 21 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/package-info.java

@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+@InterfaceAudience.Public
+package org.apache.hadoop.yarn.client.api;
+import org.apache.hadoop.classification.InterfaceAudience;
+

+ 4 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/ApplicationCLI.java

@@ -27,12 +27,16 @@ import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.GnuParser;
 import org.apache.commons.cli.HelpFormatter;
 import org.apache.commons.cli.Options;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.util.ToolRunner;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationReport;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 
+@Private
+@Unstable
 public class ApplicationCLI extends YarnCLI {
   private static final String APPLICATIONS_PATTERN =
     "%30s\t%20s\t%20s\t%10s\t%10s\t%18s\t%18s\t%15s\t%35s" +

+ 4 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/NodeCLI.java

@@ -28,12 +28,16 @@ import org.apache.commons.cli.GnuParser;
 import org.apache.commons.cli.HelpFormatter;
 import org.apache.commons.cli.Options;
 import org.apache.commons.lang.time.DateFormatUtils;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.util.ToolRunner;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.NodeReport;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 
+@Private
+@Unstable
 public class NodeCLI extends YarnCLI {
   private static final String NODES_PATTERN = "%16s\t%10s\t%17s\t%18s" +
     System.getProperty("line.separator");

+ 4 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/RMAdminCLI.java

@@ -23,6 +23,8 @@ import java.net.InetSocketAddress;
 import java.security.PrivilegedAction;
 import java.util.Arrays;
 
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.ipc.RemoteException;
@@ -42,6 +44,8 @@ import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.ipc.YarnRPC;
 
+@Private
+@Unstable
 public class RMAdminCLI extends Configured implements Tool {
 
   private final RecordFactory recordFactory = 

+ 5 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/YarnCLI.java

@@ -19,12 +19,15 @@ package org.apache.hadoop.yarn.client.cli;
 
 import java.io.PrintStream;
 
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.util.Tool;
-import org.apache.hadoop.yarn.client.YarnClient;
-import org.apache.hadoop.yarn.client.YarnClientImpl;
+import org.apache.hadoop.yarn.client.api.YarnClient;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 
+@Private
+@Unstable
 public abstract class YarnCLI extends Configured implements Tool {
 
   public static final String STATUS_CMD = "status";

+ 21 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/cli/package-info.java

@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+@InterfaceAudience.Public
+package org.apache.hadoop.yarn.client.cli;
+import org.apache.hadoop.classification.InterfaceAudience;
+

+ 8 - 5
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestAMRMClientAsync.java → hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestAMRMClientAsync.java

@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.hadoop.yarn.client;
+package org.apache.hadoop.yarn.client.api.async.impl;
 
 import static org.mockito.Matchers.anyFloat;
 import static org.mockito.Matchers.anyInt;
@@ -46,7 +46,10 @@ import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.NMToken;
 import org.apache.hadoop.yarn.api.records.NodeReport;
 import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.client.AMRMClient.ContainerRequest;
+import org.apache.hadoop.yarn.client.api.AMRMClient;
+import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
+import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
+import org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.junit.Test;
 import org.mockito.invocation.InvocationOnMock;
@@ -105,7 +108,7 @@ public class TestAMRMClientAsync {
     });
     
     AMRMClientAsync<ContainerRequest> asyncClient = 
-        new AMRMClientAsync<ContainerRequest>(client, 20, callbackHandler);
+        AMRMClientAsync.createAMRMClientAsync(client, 20, callbackHandler);
     asyncClient.init(conf);
     asyncClient.start();
     asyncClient.registerApplicationMaster("localhost", 1234, null);
@@ -160,7 +163,7 @@ public class TestAMRMClientAsync {
     when(client.allocate(anyFloat())).thenThrow(mockException);
 
     AMRMClientAsync<ContainerRequest> asyncClient = 
-        new AMRMClientAsync<ContainerRequest>(client, 20, callbackHandler);
+        AMRMClientAsync.createAMRMClientAsync(client, 20, callbackHandler);
     asyncClient.init(conf);
     asyncClient.start();
     
@@ -195,7 +198,7 @@ public class TestAMRMClientAsync {
     when(client.allocate(anyFloat())).thenReturn(rebootResponse);
     
     AMRMClientAsync<ContainerRequest> asyncClient = 
-        new AMRMClientAsync<ContainerRequest>(client, 20, callbackHandler);
+        AMRMClientAsync.createAMRMClientAsync(client, 20, callbackHandler);
     asyncClient.init(conf);
     asyncClient.start();
     

+ 28 - 23
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestNMClientAsync.java → hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestNMClientAsync.java

@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.hadoop.yarn.client;
+package org.apache.hadoop.yarn.client.api.async.impl;
 
 import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.doNothing;
@@ -48,6 +48,9 @@ import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Token;
+import org.apache.hadoop.yarn.client.api.NMClient;
+import org.apache.hadoop.yarn.client.api.async.NMClientAsync;
+import org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.factories.RecordFactory;
@@ -62,7 +65,7 @@ public class TestNMClientAsync {
   private final RecordFactory recordFactory =
       RecordFactoryProvider.getRecordFactory(null);
 
-  private NMClientAsync asyncClient;
+  private NMClientAsyncImpl asyncClient;
   private NodeId nodeId;
   private Token containerToken;
 
@@ -71,7 +74,7 @@ public class TestNMClientAsync {
     ServiceOperations.stop(asyncClient);
   }
 
-  @Test (timeout = 30000)
+  @Test (timeout = 10000)
   public void testNMClientAsync() throws Exception {
     Configuration conf = new Configuration();
     conf.setInt(YarnConfiguration.NM_CLIENT_ASYNC_THREAD_POOL_MAX_SIZE, 10);
@@ -89,40 +92,42 @@ public class TestNMClientAsync {
 
     for (int i = 0; i < expectedSuccess + expectedFailure; ++i) {
       if (i == expectedSuccess) {
-        while (!((TestCallbackHandler1) asyncClient.callbackHandler)
+        while (!((TestCallbackHandler1) asyncClient.getCallbackHandler())
             .isAllSuccessCallsExecuted()) {
           Thread.sleep(10);
         }
-        asyncClient.client = mockNMClient(1);
+        asyncClient.setClient(mockNMClient(1));
       }
       Container container = mockContainer(i);
       ContainerLaunchContext clc =
           recordFactory.newRecordInstance(ContainerLaunchContext.class);
-      asyncClient.startContainer(container, clc);
+      asyncClient.startContainerAsync(container, clc);
     }
-    while (!((TestCallbackHandler1) asyncClient.callbackHandler)
+    while (!((TestCallbackHandler1) asyncClient.getCallbackHandler())
         .isStartAndQueryFailureCallsExecuted()) {
       Thread.sleep(10);
     }
-    asyncClient.client = mockNMClient(2);
-    ((TestCallbackHandler1) asyncClient.callbackHandler).path = false;
+    asyncClient.setClient(mockNMClient(2));
+    ((TestCallbackHandler1) asyncClient.getCallbackHandler()).path = false;
     for (int i = 0; i < expectedFailure; ++i) {
       Container container = mockContainer(
           expectedSuccess + expectedFailure + i);
       ContainerLaunchContext clc =
           recordFactory.newRecordInstance(ContainerLaunchContext.class);
-      asyncClient.startContainer(container, clc);
+      asyncClient.startContainerAsync(container, clc);
     }
-    while (!((TestCallbackHandler1) asyncClient.callbackHandler)
+    while (!((TestCallbackHandler1) asyncClient.getCallbackHandler())
         .isStopFailureCallsExecuted()) {
       Thread.sleep(10);
     }
     for (String errorMsg :
-        ((TestCallbackHandler1) asyncClient.callbackHandler).errorMsgs) {
+        ((TestCallbackHandler1) asyncClient.getCallbackHandler())
+            .errorMsgs) {
       System.out.println(errorMsg);
     }
     Assert.assertEquals("Error occurs in CallbackHandler", 0,
-        ((TestCallbackHandler1) asyncClient.callbackHandler).errorMsgs.size());
+        ((TestCallbackHandler1) asyncClient.getCallbackHandler())
+            .errorMsgs.size());
     for (String errorMsg : ((MockNMClientAsync1) asyncClient).errorMsgs) {
       System.out.println(errorMsg);
     }
@@ -141,7 +146,7 @@ public class TestNMClientAsync {
         asyncClient.threadPool.isShutdown());
   }
 
-  private class MockNMClientAsync1 extends NMClientAsync {
+  private class MockNMClientAsync1 extends NMClientAsyncImpl {
     private Set<String> errorMsgs =
         Collections.synchronizedSet(new HashSet<String>());
 
@@ -227,10 +232,10 @@ public class TestNMClientAsync {
         actualStartSuccessArray.set(containerId.getId(), 1);
 
         // move on to the following success tests
-        asyncClient.getContainerStatus(containerId, nodeId, containerToken);
+        asyncClient.getContainerStatusAsync(containerId, nodeId, containerToken);
       } else {
         // move on to the following failure tests
-        asyncClient.stopContainer(containerId, nodeId, containerToken);
+        asyncClient.stopContainerAsync(containerId, nodeId, containerToken);
       }
 
       // Shouldn't crash the test thread
@@ -248,7 +253,7 @@ public class TestNMClientAsync {
       actualQuerySuccess.addAndGet(1);
       actualQuerySuccessArray.set(containerId.getId(), 1);
       // move on to the following success tests
-      asyncClient.stopContainer(containerId, nodeId, containerToken);
+      asyncClient.stopContainerAsync(containerId, nodeId, containerToken);
 
       // Shouldn't crash the test thread
       throw new RuntimeException("Ignorable Exception");
@@ -285,7 +290,7 @@ public class TestNMClientAsync {
       actualStartFailure.addAndGet(1);
       actualStartFailureArray.set(containerId.getId() - expectedSuccess, 1);
       // move on to the following failure tests
-      asyncClient.getContainerStatus(containerId, nodeId, containerToken);
+      asyncClient.getContainerStatusAsync(containerId, nodeId, containerToken);
 
       // Shouldn't crash the test thread
       throw new RuntimeException("Ignorable Exception");
@@ -426,22 +431,22 @@ public class TestNMClientAsync {
     Thread t = new Thread() {
       @Override
       public void run() {
-        asyncClient.startContainer(container, clc);
+        asyncClient.startContainerAsync(container, clc);
       }
     };
     t.start();
 
     barrierA.await();
-    asyncClient.stopContainer(container.getId(), container.getNodeId(),
+    asyncClient.stopContainerAsync(container.getId(), container.getNodeId(),
         container.getContainerToken());
     barrierC.await();
 
     Assert.assertFalse("Starting and stopping should be out of order",
-        ((TestCallbackHandler2) asyncClient.callbackHandler)
+        ((TestCallbackHandler2) asyncClient.getCallbackHandler())
             .exceptionOccurred.get());
   }
 
-  private class MockNMClientAsync2 extends NMClientAsync {
+  private class MockNMClientAsync2 extends NMClientAsyncImpl {
     private CyclicBarrier barrierA;
     private CyclicBarrier barrierB;
 
@@ -510,7 +515,7 @@ public class TestNMClientAsync {
 
     @Override
     public void onStartContainerError(ContainerId containerId, Throwable t) {
-      if (!t.getMessage().equals(NMClientAsync.StatefulContainer
+      if (!t.getMessage().equals(NMClientAsyncImpl.StatefulContainer
           .OutOfOrderTransition.STOP_BEFORE_START_ERROR_MSG)) {
         exceptionOccurred.set(true);
         return;

+ 6 - 3
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestAMRMClient.java → hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java

@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.hadoop.yarn.client;
+package org.apache.hadoop.yarn.client.api.impl;
 
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.any;
@@ -57,8 +57,11 @@ import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.api.records.Token;
 import org.apache.hadoop.yarn.api.records.YarnApplicationState;
-import org.apache.hadoop.yarn.client.AMRMClient.ContainerRequest;
-import org.apache.hadoop.yarn.client.AMRMClient.StoredContainerRequest;
+import org.apache.hadoop.yarn.client.api.AMRMClient;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
+import org.apache.hadoop.yarn.client.api.AMRMClient.StoredContainerRequest;
+import org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.server.MiniYARNCluster;

+ 5 - 4
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestAMRMClientContainerRequest.java → hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientContainerRequest.java

@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.hadoop.yarn.client;
+package org.apache.hadoop.yarn.client.api.impl;
 
 import java.util.Arrays;
 import java.util.List;
@@ -29,9 +29,10 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
+import org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl;
 import org.junit.Test;
 
-import static org.apache.hadoop.yarn.client.AMRMClientImpl.ContainerRequest;
 import static org.junit.Assert.assertEquals;
 
 public class TestAMRMClientContainerRequest {
@@ -72,8 +73,8 @@ public class TestAMRMClientContainerRequest {
   private void verifyResourceRequestLocation(
       AMRMClientImpl<ContainerRequest> client, ContainerRequest request,
       String location) {
-    ResourceRequest ask =  client.remoteRequestsTable.get(request.priority)
-        .get(location).get(request.capability).remoteRequest;
+    ResourceRequest ask =  client.remoteRequestsTable.get(request.getPriority())
+        .get(location).get(request.getCapability()).remoteRequest;
     assertEquals(location, ask.getResourceName());
     assertEquals(request.getContainerCount(), ask.getNumContainers());
   }

+ 11 - 5
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestNMClient.java → hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestNMClient.java

@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.hadoop.yarn.client;
+package org.apache.hadoop.yarn.client.api.impl;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -51,7 +51,13 @@ import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.api.records.YarnApplicationState;
-import org.apache.hadoop.yarn.client.AMRMClient.ContainerRequest;
+import org.apache.hadoop.yarn.client.api.AMRMClient;
+import org.apache.hadoop.yarn.client.api.NMClient;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
+import org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl;
+import org.apache.hadoop.yarn.client.api.impl.NMClientImpl;
+import org.apache.hadoop.yarn.client.api.impl.YarnClientImpl;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.server.MiniYARNCluster;
@@ -164,9 +170,9 @@ public class TestNMClient {
     // leave one unclosed
     assertEquals(1, nmClient.startedContainers.size());
     // default true
-    assertTrue(nmClient.cleanupRunningContainers.get());
+    assertTrue(nmClient.getCleanupRunningContainers().get());
     nmClient.cleanupRunningContainersOnStop(stopContainers);
-    assertEquals(stopContainers, nmClient.cleanupRunningContainers.get());
+    assertEquals(stopContainers, nmClient.getCleanupRunningContainers().get());
     nmClient.stop();
   }
 
@@ -201,7 +207,7 @@ public class TestNMClient {
     // stop the running containers on close
     assertFalse(nmClient.startedContainers.isEmpty());
     nmClient.cleanupRunningContainersOnStop(true);
-    assertTrue(nmClient.cleanupRunningContainers.get());
+    assertTrue(nmClient.getCleanupRunningContainers().get());
     nmClient.stop();
   }
 

+ 3 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestYarnClient.java → hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestYarnClient.java

@@ -16,7 +16,7 @@
 * limitations under the License.
 */
 
-package org.apache.hadoop.yarn.client;
+package org.apache.hadoop.yarn.client.api.impl;
 
 import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.mock;
@@ -38,6 +38,8 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationReport;
 import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
 import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.client.api.impl.YarnClientImpl;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.server.resourcemanager.MockRM;

+ 1 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestYarnCLI.java

@@ -46,7 +46,7 @@ import org.apache.hadoop.yarn.api.records.NodeReport;
 import org.apache.hadoop.yarn.api.records.NodeState;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.YarnApplicationState;
-import org.apache.hadoop.yarn.client.YarnClient;
+import org.apache.hadoop.yarn.client.api.YarnClient;
 import org.apache.hadoop.yarn.util.Records;
 import org.junit.Before;
 import org.junit.Test;