Browse Source

YARN-2317. Updated the document about how to write YARN applications. Contributed by Li Lu.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1617594 13f79535-47bb-0310-9956-ffa450edef68
Zhijie Shen 10 years ago
parent
commit
7ad7ab1723

+ 3 - 0
hadoop-yarn-project/CHANGES.txt

@@ -119,6 +119,9 @@ Release 2.6.0 - UNRELEASED
     YARN-2373. Changed WebAppUtils to use Configuration#getPassword for
     YARN-2373. Changed WebAppUtils to use Configuration#getPassword for
     accessing SSL passwords. (Larry McCay via jianhe)
     accessing SSL passwords. (Larry McCay via jianhe)
 
 
+    YARN-2317. Updated the document about how to write YARN applications. (Li Lu via
+    zjshen)
+
   OPTIMIZATIONS
   OPTIMIZATIONS
 
 
   BUG FIXES
   BUG FIXES

+ 667 - 702
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/WritingYarnApplications.apt.vm

@@ -11,8 +11,8 @@
 ~~ limitations under the License. See accompanying LICENSE file.
 ~~ limitations under the License. See accompanying LICENSE file.
 
 
   ---
   ---
-  Hadoop Map Reduce Next Generation-${project.version} - Writing YARN 
-  Applications 
+  Hadoop Map Reduce Next Generation-${project.version} - Writing YARN
+  Applications
   ---
   ---
   ---
   ---
   ${maven.build.timestamp}
   ${maven.build.timestamp}
@@ -21,772 +21,737 @@ Hadoop MapReduce Next Generation - Writing YARN Applications
 
 
 %{toc|section=1|fromDepth=0}
 %{toc|section=1|fromDepth=0}
 
 
-* Purpose 
+* Purpose
 
 
-  This document describes, at a high-level, the way to implement new 
+  This document describes, at a high-level, the way to implement new
   Applications for YARN.
   Applications for YARN.
 
 
 * Concepts and Flow
 * Concepts and Flow
 
 
-  The general concept is that an 'Application Submission Client' submits an 
-  'Application' to the YARN Resource Manager. The client communicates with the 
-  ResourceManager using the 'ApplicationClientProtocol' to first acquire a new 
-  'ApplicationId' if needed via ApplicationClientProtocol#getNewApplication and then 
-  submit the 'Application' to be run via ApplicationClientProtocol#submitApplication. As 
-  part of the ApplicationClientProtocol#submitApplication call, the client needs to 
-  provide sufficient information to the ResourceManager to 'launch' the 
-  application's first container i.e. the ApplicationMaster. 
-  You need to provide information such as the details about the local 
-  files/jars that need to be available for your application to run, the actual 
-  command that needs to be executed (with the necessary command line arguments), 
-  any Unix environment settings (optional), etc. Effectively, you need to 
-  describe the Unix process(es) that needs to be launched for your 
-  ApplicationMaster. 
-
-  The YARN ResourceManager will then launch the ApplicationMaster (as specified) 
-  on an allocated container. The ApplicationMaster is then expected to 
-  communicate with the ResourceManager using the 'ApplicationMasterProtocol'. Firstly, the 
-  ApplicationMaster needs to register itself with the ResourceManager. To 
-  complete the task assigned to it, the ApplicationMaster can then request for 
-  and receive containers via ApplicationMasterProtocol#allocate. After a container is 
-  allocated to it, the ApplicationMaster communicates with the NodeManager using 
-  ContainerManager#startContainer to launch the container for its task. As part 
-  of launching this container, the ApplicationMaster has to specify the 
-  ContainerLaunchContext which, similar to the ApplicationSubmissionContext, 
-  has the launch information such as command line specification, environment, 
-  etc. Once the task is completed, the ApplicationMaster has to signal the 
-  ResourceManager of its completion via the ApplicationMasterProtocol#finishApplicationMaster. 
-
-  Meanwhile, the client can monitor the application's status by querying the 
-  ResourceManager or by directly querying the ApplicationMaster if it supports 
-  such a service. If needed, it can also kill the application via 
-  ApplicationClientProtocol#forceKillApplication.  
-
-* Interfaces 
+  The general concept is that an <application submission client> submits an
+  <application> to the YARN <ResourceManager> (RM). This can be done through
+  setting up a <<<YarnClient>>> object. After <<<YarnClient>>> is started, the
+  client can then set up application context, prepare the very first container of
+  the application that contains the <ApplicationMaster> (AM), and then submit
+  the application. You need to provide information such as the details about the
+  local files/jars that need to be available for your application to run, the
+  actual command that needs to be executed (with the necessary command line
+  arguments), any OS environment settings (optional), etc. Effectively, you
+  need to describe the Unix process(es) that needs to be launched for your
+  ApplicationMaster.
+
+  The YARN ResourceManager will then launch the ApplicationMaster (as
+  specified) on an allocated container. The ApplicationMaster communicates with
+  YARN cluster, and handles application execution. It performs operations in an
+  asynchronous fashion. During application launch time, the main tasks of the
+  ApplicationMaster are: a) communicating with the ResourceManager to negotiate
+  and allocate resources for future containers, and b) after container
+  allocation, communicating YARN <NodeManager>s (NMs) to launch application
+  containers on them. Task a) can be performed asynchronously through an
+  <<<AMRMClientAsync>>> object, with event handling methods specified in a
+  <<<AMRMClientAsync.CallbackHandler>>> type of event handler. The event handler
+  needs to be set to the client explicitly. Task b) can be performed by launching
+  a runnable object that then launches containers when there are containers
+  allocated. As part of launching this container, the AM has to
+  specify the <<<ContainerLaunchContext>>> that has the launch information such as
+  command line specification, environment, etc.
+
+  During the execution of an application, the ApplicationMaster communicates
+  NodeManagers through <<<NMClientAsync>>> object. All container events are
+  handled by <<<NMClientAsync.CallbackHandler>>>, associated with
+  <<<NMClientAsync>>>. A typical callback handler handles client start, stop,
+  status update and error. ApplicationMaster also reports execution progress to
+  ResourceManager by handling the <<<getProgress()>>> method of
+  <<<AMRMClientAsync.CallbackHandler>>>.
+  
+  Other than asynchronous clients, there are synchronous versions for certain
+  workflows (<<<AMRMClient>>> and <<<NMClient>>>). The asynchronous clients are
+  recommended because of (subjectively) simpler usages, and this article
+  will mainly cover the asynchronous clients. Please refer to <<<AMRMClient>>>
+  and <<<NMClient>>> for more information on synchronous clients.
+
+* Interfaces
 
 
   The interfaces you'd most like be concerned with are:
   The interfaces you'd most like be concerned with are:
 
 
-  * ApplicationClientProtocol - Client\<--\>ResourceManager\
-    The protocol for a client that wishes to communicate with the 
-    ResourceManager to launch a new application (i.e. the ApplicationMaster), 
-    check on the status of the application or kill the application. For example, 
-    a job-client (a job launching program from the gateway) would use this 
-    protocol. 
+  * <<Client>>\<--\><<ResourceManager>>\
+    By using <<<YarnClient>>> objects.
+
+  * <<ApplicationMaster>>\<--\><<ResourceManager>>\
+    By using <<<AMRMClientAsync>>> objects, handling events asynchronously by
+    <<<AMRMClientAsync.CallbackHandler>>>
+
+  * <<ApplicationMaster>>\<--\><<NodeManager>>\
+    Launch containers. Communicate with NodeManagers
+    by using <<<NMClientAsync>>> objects, handling container events by
+    <<<NMClientAsync.CallbackHandler>>>
+
+  []
+
+  <<Note>>
   
   
-  * ApplicationMasterProtocol - ApplicationMaster\<--\>ResourceManager\
-    The protocol used by the ApplicationMaster to register/unregister itself 
-    to/from the ResourceManager as well as to request for resources from the 
-    Scheduler to complete its tasks. 
+    * The three main protocols for YARN application (ApplicationClientProtocol,
+      ApplicationMasterProtocol and ContainerManagementProtocol) are still
+      preserved. The 3 clients wrap these 3 protocols to provide simpler
+      programming model for YARN applications.
     
     
-  * ContainerManager - ApplicationMaster\<--\>NodeManager\
-    The protocol used by the ApplicationMaster to talk to the NodeManager to 
-    start/stop containers and get status updates on the containers if needed. 
+    * Under very rare circumstances, programmer may want to directly use the 3
+      protocols to implement an application. However, note that <such behaviors
+      are no longer encouraged for general use cases>.
+
+    []
 
 
 * Writing a Simple Yarn Application
 * Writing a Simple Yarn Application
 
 
 ** Writing a simple Client
 ** Writing a simple Client
 
 
-  * The first step that a client needs to do is to connect to the 
-    ResourceManager or to be more specific, the ApplicationsManager (AsM) 
-    interface of the ResourceManager. 
+  * The first step that a client needs to do is to initialize and start a
+    YarnClient.
 
 
 +---+
 +---+
-    ApplicationClientProtocol applicationsManager; 
-    YarnConfiguration yarnConf = new YarnConfiguration(conf);
-    InetSocketAddress rmAddress = 
-        NetUtils.createSocketAddr(yarnConf.get(
-            YarnConfiguration.RM_ADDRESS,
-            YarnConfiguration.DEFAULT_RM_ADDRESS));		
-    LOG.info("Connecting to ResourceManager at " + rmAddress);
-    configuration appsManagerServerConf = new Configuration(conf);
-    appsManagerServerConf.setClass(
-        YarnConfiguration.YARN_SECURITY_INFO,
-        ClientRMSecurityInfo.class, SecurityInfo.class);
-    applicationsManager = ((ApplicationClientProtocol) rpc.getProxy(
-        ApplicationClientProtocol.class, rmAddress, appsManagerServerConf));    
+  YarnClient yarnClient = YarnClient.createYarnClient();
+  yarnClient.init(conf);
+  yarnClient.start();
 +---+
 +---+
 
 
-  * Once a handle is obtained to the ASM, the client needs to request the 
-    ResourceManager for a new ApplicationId. 
+  * Once a client is set up, the client needs to create an application, and get
+    its application id.
 
 
 +---+
 +---+
-    GetNewApplicationRequest request = 
-        Records.newRecord(GetNewApplicationRequest.class);		
-    GetNewApplicationResponse response = 
-        applicationsManager.getNewApplication(request);
-    LOG.info("Got new ApplicationId=" + response.getApplicationId());
+  YarnClientApplication app = yarnClient.createApplication();
+  GetNewApplicationResponse appResponse = app.getNewApplicationResponse();
 +---+
 +---+
 
 
-  * The response from the ASM for a new application also contains information 
-    about the cluster such as the minimum/maximum resource capabilities of the 
-    cluster. This is required so that to ensure that you can correctly set the 
-    specifications of the container in which the ApplicationMaster would be 
-    launched. Please refer to GetNewApplicationResponse for more details. 
+  * The response from the <<<YarnClientApplication>>> for a new application also
+    contains information about the cluster such as the minimum/maximum resource
+    capabilities of the cluster. This is required so that to ensure that you can
+    correctly set the specifications of the container in which the
+    ApplicationMaster would be launched. Please refer to
+    <<<GetNewApplicationResponse>>> for more details.
+
+  * The main crux of a client is to setup the <<<ApplicationSubmissionContext>>>
+    which defines all the information needed by the RM to launch the AM. A client
+    needs to set the following into the context:
+
+    * Application info: id, name
+
+    * Queue, priority info: Queue to which the application will be submitted,
+      the priority to be assigned for the application.
+
+    * User: The user submitting the application
+
+    * <<<ContainerLaunchContext>>>: The information defining the container in
+      which the AM will be launched and run. The <<<ContainerLaunchContext>>>, as
+      mentioned previously, defines all the required information needed to run
+      the application such as the local <<R>>esources (binaries, jars, files
+      etc.), <<E>>nvironment settings (CLASSPATH etc.), the <<C>>ommand to be
+      executed and security <<T>>okens (<RECT>).
+
+    []
 
 
-  * The main crux of a client is to setup the ApplicationSubmissionContext 
-    which defines all the information needed by the ResourceManager to launch 
-    the ApplicationMaster. A client needs to set the following into the context: 
-    
-    * Application Info: id, name
-
-    * Queue, Priority info: Queue to which the application will be submitted, 
-      the priority to be assigned for the application. 
-
-    * User: The user submitting the application 
-
-    * ContainerLaunchContext: The information defining the container in which 
-      the ApplicationMaster will be launched and run. The 
-      ContainerLaunchContext, as mentioned previously, defines all the required
-      information needed to run the ApplicationMaster such as the local 
-      resources (binaries, jars, files etc.), security tokens, environment 
-      settings (CLASSPATH etc.) and the command to be executed. 
-       
-    []   
-
-+---+
-    // Create a new ApplicationSubmissionContext
-    ApplicationSubmissionContext appContext = 
-        Records.newRecord(ApplicationSubmissionContext.class);
-    // set the ApplicationId 
-    appContext.setApplicationId(appId);
-    // set the application name
-    appContext.setApplicationName(appName);
-    
-    // Create a new container launch context for the AM's container
-    ContainerLaunchContext amContainer = 
-        Records.newRecord(ContainerLaunchContext.class);
-
-    // Define the local resources required 
-    Map<String, LocalResource> localResources = 
-        new HashMap<String, LocalResource>();
-    // Lets assume the jar we need for our ApplicationMaster is available in 
-    // HDFS at a certain known path to us and we want to make it available to
-    // the ApplicationMaster in the launched container 
-    Path jarPath; // <- known path to jar file  
-    FileStatus jarStatus = fs.getFileStatus(jarPath);
-    LocalResource amJarRsrc = Records.newRecord(LocalResource.class);
-    // Set the type of resource - file or archive
-    // archives are untarred at the destination by the framework
-    amJarRsrc.setType(LocalResourceType.FILE);
-    // Set visibility of the resource 
-    // Setting to most private option i.e. this file will only 
-    // be visible to this instance of the running application
-    amJarRsrc.setVisibility(LocalResourceVisibility.APPLICATION);	   
-    // Set the location of resource to be copied over into the 
-    // working directory
-    amJarRsrc.setResource(ConverterUtils.getYarnUrlFromPath(jarPath)); 
-    // Set timestamp and length of file so that the framework 
-    // can do basic sanity checks for the local resource 
-    // after it has been copied over to ensure it is the same 
-    // resource the client intended to use with the application
-    amJarRsrc.setTimestamp(jarStatus.getModificationTime());
-    amJarRsrc.setSize(jarStatus.getLen());
-    // The framework will create a symlink called AppMaster.jar in the 
-    // working directory that will be linked back to the actual file. 
-    // The ApplicationMaster, if needs to reference the jar file, would 
-    // need to use the symlink filename.  
-    localResources.put("AppMaster.jar",  amJarRsrc);    
-    // Set the local resources into the launch context    
-    amContainer.setLocalResources(localResources);
-
-    // Set up the environment needed for the launch context
-    Map<String, String> env = new HashMap<String, String>();    
-    // For example, we could setup the classpath needed.
-    // Assuming our classes or jars are available as local resources in the
-    // working directory from which the command will be run, we need to append
-    // "." to the path. 
-    // By default, all the hadoop specific classpaths will already be available 
-    // in $CLASSPATH, so we should be careful not to overwrite it.   
-    String classPathEnv = "$CLASSPATH:./*:";    
-    env.put("CLASSPATH", classPathEnv);
-    amContainer.setEnvironment(env);
-    
-    // Construct the command to be executed on the launched container 
-    String command = 
-        "${JAVA_HOME}" + /bin/java" +
-        " MyAppMaster" + 
-        " arg1 arg2 arg3" + 
-        " 1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout" +
-        " 2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr";                     
-
-    List<String> commands = new ArrayList<String>();
-    commands.add(command);
-    // add additional commands if needed		
-
-    // Set the command array into the container spec
-    amContainer.setCommands(commands);
-    
-    // Define the resource requirements for the container
-    // For now, YARN only supports memory so we set the memory 
-    // requirements. 
-    // If the process takes more than its allocated memory, it will 
-    // be killed by the framework. 
-    // Memory being requested for should be less than max capability 
-    // of the cluster and all asks should be a multiple of the min capability. 
-    Resource capability = Records.newRecord(Resource.class);
-    capability.setMemory(amMemory);
-    amContainer.setResource(capability);
-    
-    // Set the container launch content into the ApplicationSubmissionContext
-    appContext.setAMContainerSpec(amContainer);
-+---+
-
-  * After the setup process is complete, the client is finally ready to submit 
-    the application to the ASM.  
-     
-+---+
-    // Create the request to send to the ApplicationsManager 
-    SubmitApplicationRequest appRequest = 
-        Records.newRecord(SubmitApplicationRequest.class);
-    appRequest.setApplicationSubmissionContext(appContext);
-
-    // Submit the application to the ApplicationsManager
-    // Ignore the response as either a valid response object is returned on 
-    // success or an exception thrown to denote the failure
-    applicationsManager.submitApplication(appRequest);
-+---+    
-   
-  * At this point, the ResourceManager will have accepted the application and 
-    in the background, will go through the process of allocating a container 
-    with the required specifications and then eventually setting up and 
-    launching the ApplicationMaster on the allocated container. 
-    
-  * There are multiple ways a client can track progress of the actual task. 
-  
-    * It can communicate with the ResourceManager and request for a report of 
-      the application via ApplicationClientProtocol#getApplicationReport. 
-
-+-----+     
-      GetApplicationReportRequest reportRequest = 
-          Records.newRecord(GetApplicationReportRequest.class);
-      reportRequest.setApplicationId(appId);
-      GetApplicationReportResponse reportResponse = 
-          applicationsManager.getApplicationReport(reportRequest);
-      ApplicationReport report = reportResponse.getApplicationReport();
-+-----+             
-  
-      The ApplicationReport received from the ResourceManager consists of the following: 
-      
-        * General application information: ApplicationId, queue to which the 
-          application was submitted, user who submitted the application and the 
-          start time for the application. 
-          
-        * ApplicationMaster details: the host on which the ApplicationMaster is 
-          running, the rpc port (if any) on which it is listening for requests 
-          from clients and a token that the client needs to communicate with 
-          the ApplicationMaster. 
-          
-        * Application tracking information: If the application supports some 
-          form of progress tracking, it can set a tracking url which is 
-          available via ApplicationReport#getTrackingUrl that a client can look 
-          at to monitor progress. 
-          
-        * ApplicationStatus: The state of the application as seen by the 
-          ResourceManager is available via 
-          ApplicationReport#getYarnApplicationState. If the 
-          YarnApplicationState is set to FINISHED, the client should refer to 
-          ApplicationReport#getFinalApplicationStatus to check for the actual 
-          success/failure of the application task itself. In case of failures, 
-          ApplicationReport#getDiagnostics may be useful to shed some more 
-          light on the the failure.      
- 
-    * If the ApplicationMaster supports it, a client can directly query the 
-      ApplicationMaster itself for progress updates via the host:rpcport 
-      information obtained from the ApplicationReport. It can also use the 
-      tracking url obtained from the report if available.
-
-  * In certain situations, if the application is taking too long or due to 
-    other factors, the client may wish to kill the application. The 
-    ApplicationClientProtocol supports the forceKillApplication call that allows a 
-    client to send a kill signal to the ApplicationMaster via the 
-    ResourceManager. An ApplicationMaster if so designed may also support an 
-    abort call via its rpc layer that a client may be able to leverage.
-
-+---+
-    KillApplicationRequest killRequest = 
-        Records.newRecord(KillApplicationRequest.class);		
-    killRequest.setApplicationId(appId);
-    applicationsManager.forceKillApplication(killRequest);	
-+---+
-
-** Writing an ApplicationMaster
-
-  * The ApplicationMaster is the actual owner of the job. It will be launched 
-    by the ResourceManager and via the client will be provided all the necessary 
-    information and resources about the job that it has been tasked with to 
-    oversee and complete.  
-
-  * As the ApplicationMaster is launched within a container that may (likely 
-    will) be sharing a physical host with other containers, given the 
-    multi-tenancy nature, amongst other issues, it cannot make any assumptions 
-    of things like pre-configured ports that it can listen on. 
-  
-  * When the ApplicationMaster starts up, several parameters are made available
-    to it via the environment. These include the ContainerId for the
-    ApplicationMaster container, the application submission time and details
-    about the NodeManager host running the Application Master.
-    Ref ApplicationConstants for parameter names.
-
-  * All interactions with the ResourceManager require an ApplicationAttemptId 
-    (there can be multiple attempts per application in case of failures). The 
-    ApplicationAttemptId can be obtained from the ApplicationMaster
-    containerId. There are helper apis to convert the value obtained from the
-    environment into objects.
-    
 +---+
 +---+
-    Map<String, String> envs = System.getenv();
-    String containerIdString = 
-        envs.get(ApplicationConstants.AM_CONTAINER_ID_ENV);
-    if (containerIdString == null) {
-      // container id should always be set in the env by the framework 
-      throw new IllegalArgumentException(
-          "ContainerId not set in the environment");
+  // set the application submission context
+  ApplicationSubmissionContext appContext = app.getApplicationSubmissionContext();
+  ApplicationId appId = appContext.getApplicationId();
+
+  appContext.setKeepContainersAcrossApplicationAttempts(keepContainers);
+  appContext.setApplicationName(appName);
+
+  // set local resources for the application master
+  // local files or archives as needed
+  // In this scenario, the jar file for the application master is part of the local resources
+  Map<String, LocalResource> localResources = new HashMap<String, LocalResource>();
+
+  LOG.info("Copy App Master jar from local filesystem and add to local environment");
+  // Copy the application master jar to the filesystem
+  // Create a local resource to point to the destination jar path
+  FileSystem fs = FileSystem.get(conf);
+  addToLocalResources(fs, appMasterJar, appMasterJarPath, appId.toString(),
+      localResources, null);
+
+  // Set the log4j properties if needed
+  if (!log4jPropFile.isEmpty()) {
+    addToLocalResources(fs, log4jPropFile, log4jPath, appId.toString(),
+        localResources, null);
+  }
+
+  // The shell script has to be made available on the final container(s)
+  // where it will be executed.
+  // To do this, we need to first copy into the filesystem that is visible
+  // to the yarn framework.
+  // We do not need to set this as a local resource for the application
+  // master as the application master does not need it.
+  String hdfsShellScriptLocation = "";
+  long hdfsShellScriptLen = 0;
+  long hdfsShellScriptTimestamp = 0;
+  if (!shellScriptPath.isEmpty()) {
+    Path shellSrc = new Path(shellScriptPath);
+    String shellPathSuffix =
+        appName + "/" + appId.toString() + "/" + SCRIPT_PATH;
+    Path shellDst =
+        new Path(fs.getHomeDirectory(), shellPathSuffix);
+    fs.copyFromLocalFile(false, true, shellSrc, shellDst);
+    hdfsShellScriptLocation = shellDst.toUri().toString();
+    FileStatus shellFileStatus = fs.getFileStatus(shellDst);
+    hdfsShellScriptLen = shellFileStatus.getLen();
+    hdfsShellScriptTimestamp = shellFileStatus.getModificationTime();
+  }
+
+  if (!shellCommand.isEmpty()) {
+    addToLocalResources(fs, null, shellCommandPath, appId.toString(),
+        localResources, shellCommand);
+  }
+
+  if (shellArgs.length > 0) {
+    addToLocalResources(fs, null, shellArgsPath, appId.toString(),
+        localResources, StringUtils.join(shellArgs, " "));
+  }
+
+  // Set the env variables to be setup in the env where the application master will be run
+  LOG.info("Set the environment for the application master");
+  Map<String, String> env = new HashMap<String, String>();
+
+  // put location of shell script into env
+  // using the env info, the application master will create the correct local resource for the
+  // eventual containers that will be launched to execute the shell scripts
+  env.put(DSConstants.DISTRIBUTEDSHELLSCRIPTLOCATION, hdfsShellScriptLocation);
+  env.put(DSConstants.DISTRIBUTEDSHELLSCRIPTTIMESTAMP, Long.toString(hdfsShellScriptTimestamp));
+  env.put(DSConstants.DISTRIBUTEDSHELLSCRIPTLEN, Long.toString(hdfsShellScriptLen));
+
+  // Add AppMaster.jar location to classpath
+  // At some point we should not be required to add
+  // the hadoop specific classpaths to the env.
+  // It should be provided out of the box.
+  // For now setting all required classpaths including
+  // the classpath to "." for the application jar
+  StringBuilder classPathEnv = new StringBuilder(Environment.CLASSPATH.$$())
+    .append(ApplicationConstants.CLASS_PATH_SEPARATOR).append("./*");
+  for (String c : conf.getStrings(
+      YarnConfiguration.YARN_APPLICATION_CLASSPATH,
+      YarnConfiguration.DEFAULT_YARN_CROSS_PLATFORM_APPLICATION_CLASSPATH)) {
+    classPathEnv.append(ApplicationConstants.CLASS_PATH_SEPARATOR);
+    classPathEnv.append(c.trim());
+  }
+  classPathEnv.append(ApplicationConstants.CLASS_PATH_SEPARATOR).append(
+    "./log4j.properties");
+
+  // Set the necessary command to execute the application master
+  Vector<CharSequence> vargs = new Vector<CharSequence>(30);
+
+  // Set java executable command
+  LOG.info("Setting up app master command");
+  vargs.add(Environment.JAVA_HOME.$$() + "/bin/java");
+  // Set Xmx based on am memory size
+  vargs.add("-Xmx" + amMemory + "m");
+  // Set class name
+  vargs.add(appMasterMainClass);
+  // Set params for Application Master
+  vargs.add("--container_memory " + String.valueOf(containerMemory));
+  vargs.add("--container_vcores " + String.valueOf(containerVirtualCores));
+  vargs.add("--num_containers " + String.valueOf(numContainers));
+  vargs.add("--priority " + String.valueOf(shellCmdPriority));
+
+  for (Map.Entry<String, String> entry : shellEnv.entrySet()) {
+    vargs.add("--shell_env " + entry.getKey() + "=" + entry.getValue());
+  }
+  if (debugFlag) {
+    vargs.add("--debug");
+  }
+
+  vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/AppMaster.stdout");
+  vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/AppMaster.stderr");
+
+  // Get final commmand
+  StringBuilder command = new StringBuilder();
+  for (CharSequence str : vargs) {
+    command.append(str).append(" ");
+  }
+
+  LOG.info("Completed setting up app master command " + command.toString());
+  List<String> commands = new ArrayList<String>();
+  commands.add(command.toString());
+
+  // Set up the container launch context for the application master
+  ContainerLaunchContext amContainer = ContainerLaunchContext.newInstance(
+    localResources, env, commands, null, null, null);
+
+  // Set up resource type requirements
+  // For now, both memory and vcores are supported, so we set memory and
+  // vcores requirements
+  Resource capability = Resource.newInstance(amMemory, amVCores);
+  appContext.setResource(capability);
+
+  // Service data is a binary blob that can be passed to the application
+  // Not needed in this scenario
+  // amContainer.setServiceData(serviceData);
+
+  // Setup security tokens
+  if (UserGroupInformation.isSecurityEnabled()) {
+    // Note: Credentials class is marked as LimitedPrivate for HDFS and MapReduce
+    Credentials credentials = new Credentials();
+    String tokenRenewer = conf.get(YarnConfiguration.RM_PRINCIPAL);
+    if (tokenRenewer == null || tokenRenewer.length() == 0) {
+      throw new IOException(
+        "Can't get Master Kerberos principal for the RM to use as renewer");
     }
     }
-    ContainerId containerId = ConverterUtils.toContainerId(containerIdString);
-    ApplicationAttemptId appAttemptID = containerId.getApplicationAttemptId();
-+---+     
-  
-  * After an ApplicationMaster has initialized itself completely, it needs to 
-    register with the ResourceManager via 
-    ApplicationMasterProtocol#registerApplicationMaster. The ApplicationMaster always 
-    communicate via the Scheduler interface of the ResourceManager. 
-  
+
+    // For now, only getting tokens for the default file-system.
+    final Token<?> tokens[] =
+        fs.addDelegationTokens(tokenRenewer, credentials);
+    if (tokens != null) {
+      for (Token<?> token : tokens) {
+        LOG.info("Got dt for " + fs.getUri() + "; " + token);
+      }
+    }
+    DataOutputBuffer dob = new DataOutputBuffer();
+    credentials.writeTokenStorageToStream(dob);
+    ByteBuffer fsTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
+    amContainer.setTokens(fsTokens);
+  }
+
+  appContext.setAMContainerSpec(amContainer);
 +---+
 +---+
-    // Connect to the Scheduler of the ResourceManager. 
-    YarnConfiguration yarnConf = new YarnConfiguration(conf);
-    InetSocketAddress rmAddress = 
-        NetUtils.createSocketAddr(yarnConf.get(
-            YarnConfiguration.RM_SCHEDULER_ADDRESS,
-            YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS));		
-    LOG.info("Connecting to ResourceManager at " + rmAddress);
-    ApplicationMasterProtocol resourceManager = 
-        (ApplicationMasterProtocol) rpc.getProxy(ApplicationMasterProtocol.class, rmAddress, conf);
-
-    // Register the AM with the RM
-    // Set the required info into the registration request: 
-    // ApplicationAttemptId, 
-    // host on which the app master is running
-    // rpc port on which the app master accepts requests from the client 
-    // tracking url for the client to track app master progress
-    RegisterApplicationMasterRequest appMasterRequest = 
-        Records.newRecord(RegisterApplicationMasterRequest.class);
-    appMasterRequest.setApplicationAttemptId(appAttemptID);	
-    appMasterRequest.setHost(appMasterHostname);
-    appMasterRequest.setRpcPort(appMasterRpcPort);
-    appMasterRequest.setTrackingUrl(appMasterTrackingUrl);
-
-    // The registration response is useful as it provides information about the 
-    // cluster. 
-    // Similar to the GetNewApplicationResponse in the client, it provides 
-    // information about the min/mx resource capabilities of the cluster that 
-    // would be needed by the ApplicationMaster when requesting for containers.
-    RegisterApplicationMasterResponse response = 
-        resourceManager.registerApplicationMaster(appMasterRequest);
-+---+
-     
-  * The ApplicationMaster has to emit heartbeats to the ResourceManager to keep 
-    it informed that the ApplicationMaster is alive and still running. The 
-    timeout expiry interval at the ResourceManager is defined by a config 
-    setting accessible via YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS with the 
-    default being defined by YarnConfiguration.DEFAULT_RM_AM_EXPIRY_INTERVAL_MS. 
-    The ApplicationMasterProtocol#allocate calls to the ResourceManager count as heartbeats 
-    as it also supports sending progress update information. Therefore, an 
-    allocate call with no containers requested and progress information updated 
-    if any is a valid way for making heartbeat calls to the ResourceManager. 
-    
-  * Based on the task requirements, the ApplicationMaster can ask for a set of 
-    containers to run its tasks on. The ApplicationMaster has to use the 
-    ResourceRequest class to define the following container specifications: 
-    
-    * Hostname: If containers are required to be hosted on a particular rack or 
-      a specific host. '*' is a special value that implies any host will do. 
-      
-    * Resource capability: Currently, YARN only supports memory based resource 
-      requirements so the request should define how much memory is needed. The 
-      value is defined in MB and has to less than the max capability of the 
+
+  * After the setup process is complete, the client is ready to submit
+    the application with specified priority and queue.
+
++---+
+  // Set the priority for the application master
+  Priority pri = Priority.newInstance(amPriority);
+  appContext.setPriority(pri);
+
+  // Set the queue to which this application is to be submitted in the RM
+  appContext.setQueue(amQueue);
+
+  // Submit the application to the applications manager
+  // SubmitApplicationResponse submitResp = applicationsManager.submitApplication(appRequest);
+
+  yarnClient.submitApplication(appContext);
++---+
+
+  * At this point, the RM will have accepted the application and in the
+    background, will go through the process of allocating a container with the
+    required specifications and then eventually setting up and launching the AM
+    on the allocated container.
+
+  * There are multiple ways a client can track progress of the actual task.
+
+    * It can communicate with the RM and request for a report of the application
+      via the <<<getApplicationReport()>>> method of <<<YarnClient>>>.
+
++-----+
+  // Get application report for the appId we are interested in
+  ApplicationReport report = yarnClient.getApplicationReport(appId);
++-----+
+
+      The <<<ApplicationReport>>> received from the RM consists of the following:
+
+        * General application information: Application id, queue to which the
+          application was submitted, user who submitted the application and the
+          start time for the application.
+
+        * ApplicationMaster details: the host on which the AM is running, the
+          rpc port (if any) on which it is listening for requests from clients
+          and a token that the client needs to communicate with the AM.
+
+        * Application tracking information: If the application supports some form
+          of progress tracking, it can set a tracking url which is available via
+          <<<ApplicationReport>>>'s <<<getTrackingUrl()>>> method that a client
+          can look at to monitor progress.
+
+        * Application status: The state of the application as seen by the
+          ResourceManager is available via
+          <<<ApplicationReport#getYarnApplicationState>>>. If the
+          <<<YarnApplicationState>>> is set to <<<FINISHED>>>, the client should
+          refer to <<<ApplicationReport#getFinalApplicationStatus>>> to check for
+          the actual success/failure of the application task itself. In case of
+          failures, <<<ApplicationReport#getDiagnostics>>> may be useful to shed
+          some more light on the the failure.
+
+    * If the ApplicationMaster supports it, a client can directly query the AM
+      itself for progress updates via the host:rpcport information obtained from
+      the application report. It can also use the tracking url obtained from the
+      report if available.
+
+  * In certain situations, if the application is taking too long or due to other
+    factors, the client may wish to kill the application. <<<YarnClient>>>
+    supports the <<<killApplication>>> call that allows a client to send a kill
+    signal to the AM via the ResourceManager. An ApplicationMaster if so
+    designed may also support an abort call via its rpc layer that a client may
+    be able to leverage.
+
++---+
+  yarnClient.killApplication(appId);
++---+
+
+** Writing an ApplicationMaster (AM)
+
+  * The AM is the actual owner of the job. It will be launched
+    by the RM and via the client will be provided all the
+    necessary information and resources about the job that it has been tasked
+    with to oversee and complete.
+
+  * As the AM is launched within a container that may (likely
+    will) be sharing a physical host with other containers, given the
+    multi-tenancy nature, amongst other issues, it cannot make any assumptions
+    of things like pre-configured ports that it can listen on.
+
+  * When the AM starts up, several parameters are made available
+    to it via the environment. These include the <<<ContainerId>>> for the
+    AM container, the application submission time and details
+    about the NM (NodeManager) host running the ApplicationMaster.
+    Ref <<<ApplicationConstants>>> for parameter names.
+
+  * All interactions with the RM require an <<<ApplicationAttemptId>>> (there can
+    be multiple attempts per application in case of failures). The
+    <<<ApplicationAttemptId>>> can be obtained from the AM's container id. There
+    are helper APIs to convert the value obtained from the environment into
+    objects.
+
++---+
+  Map<String, String> envs = System.getenv();
+  String containerIdString =
+      envs.get(ApplicationConstants.AM_CONTAINER_ID_ENV);
+  if (containerIdString == null) {
+    // container id should always be set in the env by the framework
+    throw new IllegalArgumentException(
+        "ContainerId not set in the environment");
+  }
+  ContainerId containerId = ConverterUtils.toContainerId(containerIdString);
+  ApplicationAttemptId appAttemptID = containerId.getApplicationAttemptId();
++---+
+
+  * After an AM has initialized itself completely, we can start the two clients:
+    one to ResourceManager, and one to NodeManagers. We set them up with our
+    customized event handler, and we will talk about those event handlers in
+    detail later in this article.
+
++---+
+  AMRMClientAsync.CallbackHandler allocListener = new RMCallbackHandler();
+  amRMClient = AMRMClientAsync.createAMRMClientAsync(1000, allocListener);
+  amRMClient.init(conf);
+  amRMClient.start();
+
+  containerListener = createNMCallbackHandler();
+  nmClientAsync = new NMClientAsyncImpl(containerListener);
+  nmClientAsync.init(conf);
+  nmClientAsync.start();
++---+
+
+  * The AM has to emit heartbeats to the RM to keep it informed that the AM is
+    alive and still running. The timeout expiry interval at the RM is defined by
+    a config setting accessible via
+    <<<YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS>>> with the default being
+    defined by <<<YarnConfiguration.DEFAULT_RM_AM_EXPIRY_INTERVAL_MS>>>. The
+    ApplicationMaster needs to register itself with the ResourceManager to
+    start hearbeating.
+
++---+
+  // Register self with ResourceManager
+  // This will start heartbeating to the RM
+  appMasterHostname = NetUtils.getHostname();
+  RegisterApplicationMasterResponse response = amRMClient
+      .registerApplicationMaster(appMasterHostname, appMasterRpcPort,
+          appMasterTrackingUrl);
++---+
+
+  * In the response of the registration, maximum resource capability if included. You may want to use this to check the application's request.
+
++---+
+  // Dump out information about cluster capability as seen by the
+  // resource manager
+  int maxMem = response.getMaximumResourceCapability().getMemory();
+  LOG.info("Max mem capabililty of resources in this cluster " + maxMem);
+
+  int maxVCores = response.getMaximumResourceCapability().getVirtualCores();
+  LOG.info("Max vcores capabililty of resources in this cluster " + maxVCores);
+
+  // A resource ask cannot exceed the max.
+  if (containerMemory > maxMem) {
+    LOG.info("Container memory specified above max threshold of cluster."
+        + " Using max value." + ", specified=" + containerMemory + ", max="
+        + maxMem);
+    containerMemory = maxMem;
+  }
+
+  if (containerVirtualCores > maxVCores) {
+    LOG.info("Container virtual cores specified above max threshold of  cluster."
+      + " Using max value." + ", specified=" + containerVirtualCores + ", max="
+      + maxVCores);
+    containerVirtualCores = maxVCores;
+  }
+  List<Container> previousAMRunningContainers =
+      response.getContainersFromPreviousAttempts();
+  LOG.info("Received " + previousAMRunningContainers.size()
+          + " previous AM's running containers on AM registration.");
++---+
+
+  * Based on the task requirements, the AM can ask for a set of containers to run
+    its tasks on. We can now calculate how many containers we need, and request
+    those many containers.
+
++---+
+  List<Container> previousAMRunningContainers =
+      response.getContainersFromPreviousAttempts();
+  List<Container> previousAMRunningContainers =
+      response.getContainersFromPreviousAttempts();
+  LOG.info("Received " + previousAMRunningContainers.size()
+      + " previous AM's running containers on AM registration.");
+
+  int numTotalContainersToRequest =
+      numTotalContainers - previousAMRunningContainers.size();
+  // Setup ask for containers from RM
+  // Send request for containers to RM
+  // Until we get our fully allocated quota, we keep on polling RM for
+  // containers
+  // Keep looping until all the containers are launched and shell script
+  // executed on them ( regardless of success/failure).
+  for (int i = 0; i < numTotalContainersToRequest; ++i) {
+    ContainerRequest containerAsk = setupContainerAskForRM();
+    amRMClient.addContainerRequest(containerAsk);
+  }
++---+
+
+  * In <<<setupContainerAskForRM()>>>, the follow two things need some set up:
+
+    * Resource capability: Currently, YARN supports memory based resource
+      requirements so the request should define how much memory is needed. The
+      value is defined in MB and has to less than the max capability of the
       cluster and an exact multiple of the min capability. Memory resources
       cluster and an exact multiple of the min capability. Memory resources
-      correspond to physical memory limits imposed on the task containers.
-      
-    * Priority: When asking for sets of containers, an ApplicationMaster may 
-      define different priorities to each set. For example, the Map-Reduce 
-      ApplicationMaster may assign a higher priority to containers needed 
-      for the Map tasks and a lower priority for the Reduce tasks' containers.
-      
-    []     
-       
-+----+ 
-    // Resource Request
-    ResourceRequest rsrcRequest = Records.newRecord(ResourceRequest.class);
-
-    // setup requirements for hosts 
-    // whether a particular rack/host is needed 
-    // useful for applications that are sensitive
-    // to data locality 
-    rsrcRequest.setHostName("*");
+      correspond to physical memory limits imposed on the task containers. It
+      will also support computation based resource (vCore), as shown in the code.
 
 
+    * Priority: When asking for sets of containers, an AM may define different
+      priorities to each set. For example, the Map-Reduce AM may assign a higher
+      priority to containers needed for the Map tasks and a lower priority for
+      the Reduce tasks' containers.
+
+    []
+
++---+
+  private ContainerRequest setupContainerAskForRM() {
+    // setup requirements for hosts
+    // using * as any host will do for the distributed shell app
     // set the priority for the request
     // set the priority for the request
-    Priority pri = Records.newRecord(Priority.class);
-    pri.setPriority(requestPriority);
-    rsrcRequest.setPriority(pri);	    
+    Priority pri = Priority.newInstance(requestPriority);
 
 
     // Set up resource type requirements
     // Set up resource type requirements
-    // For now, only memory is supported so we set memory requirements
-    Resource capability = Records.newRecord(Resource.class);
-    capability.setMemory(containerMemory);
-    rsrcRequest.setCapability(capability);
-
-    // set no. of containers needed
-    // matching the specifications
-    rsrcRequest.setNumContainers(numContainers);
-+---+
-        
-  * After defining the container requirements, the ApplicationMaster has to 
-    construct an AllocateRequest to send to the ResourceManager. 
-    The AllocateRequest consists of:
-        
-    * Requested containers: The container specifications and the no. of 
-      containers being requested for by the ApplicationMaster from the 
-      ResourceManager. 
-    
-    * Released containers: There may be situations when the ApplicationMaster 
-      may have requested for more containers that it needs or due to failure 
-      issues, decide to use other containers allocated to it. In all such 
-      situations, it is beneficial to the cluster if the ApplicationMaster 
-      releases these containers back to the ResourceManager so that they can be 
-      re-allocated to other applications.   
-    
-    * ResponseId: The response id that will be sent back in the response from 
-      the allocate call.  
-     
-    * Progress update information: The ApplicationMaster can send its progress 
-      update (range between to 0 to 1) to the ResourceManager. 
-    
-    []
-    
+    // For now, memory and CPU are supported so we set memory and cpu requirements
+    Resource capability = Resource.newInstance(containerMemory,
+      containerVirtualCores);
+
+    ContainerRequest request = new ContainerRequest(capability, null, null,
+        pri);
+    LOG.info("Requested container ask: " + request.toString());
+    return request;
+  }
 +---+
 +---+
-    List<ResourceRequest> requestedContainers;
-    List<ContainerId> releasedContainers    
-    AllocateRequest req = Records.newRecord(AllocateRequest.class);
 
 
-    // The response id set in the request will be sent back in 
-    // the response so that the ApplicationMaster can 
-    // match it to its original ask and act appropriately.
-    req.setResponseId(rmRequestID);
-    
-    // Set ApplicationAttemptId 
-    req.setApplicationAttemptId(appAttemptID);
-    
-    // Add the list of containers being asked for 
-    req.addAllAsks(requestedContainers);
-    
-    // If the ApplicationMaster has no need for certain 
-    // containers due to over-allocation or for any other
-    // reason, it can release them back to the ResourceManager
-    req.addAllReleases(releasedContainers);
-    
-    // Assuming the ApplicationMaster can track its progress
-    req.setProgress(currentProgress);
-    
-    AllocateResponse allocateResponse = resourceManager.allocate(req);		     
+  * After container allocation requests have been sent by the application
+    manager, contailers will be launched asynchronously, by the event handler of
+    the <<<AMRMClientAsync>>> client. The handler should implement
+    <<<AMRMClientAsync.CallbackHandler>>> interface.
+
+    * When there are containers allocated, the handler sets up a thread that runs
+      the code to launch containers. Here we use the name
+      <<<LaunchContainerRunnable>>> to demonstrate. We will talk about the
+      <<<LaunchContainerRunnable>>> class in the following part of this article.
+
 +---+
 +---+
-    
-  * The AllocateResponse sent back from the ResourceManager provides the 
-    following information:
-  
-    * Reboot flag: For scenarios when the ApplicationMaster may get out of sync 
-      with the ResourceManager. 
-    
-    * Allocated containers: The containers that have been allocated to the 
-      ApplicationMaster.
-    
-    * Headroom: Headroom for resources in the cluster. Based on this information 
-      and knowing its needs, an ApplicationMaster can make intelligent decisions 
-      such as re-prioritizing sub-tasks to take advantage of currently allocated 
-      containers, bailing out faster if resources are not becoming available 
-      etc.         
-    
-    * Completed containers: Once an ApplicationMaster triggers a launch an 
-      allocated container, it will receive an update from the ResourceManager 
-      when the container completes. The ApplicationMaster can look into the 
-      status of the completed container and take appropriate actions such as 
-      re-trying a particular sub-task in case of a failure.
-
-    * Number of cluster nodes: The number of hosts available on the cluster.
-      
-    [] 
-      
-    One thing to note is that containers will not be immediately allocated to 
-    the ApplicationMaster. This does not imply that the ApplicationMaster should 
-    keep on asking the pending count of required containers. Once an allocate 
-    request has been sent, the ApplicationMaster will eventually be allocated 
-    the containers based on cluster capacity, priorities and the scheduling 
-    policy in place. The ApplicationMaster should only request for containers 
-    again if and only if its original estimate changed and it needs additional 
-    containers. 
-
-+---+
-
-    // Retrieve list of allocated containers from the response 
-    // and on each allocated container, lets assume we are launching 
-    // the same job.
-    List<Container> allocatedContainers = allocateResponse.getAllocatedContainers();
+  @Override
+  public void onContainersAllocated(List<Container> allocatedContainers) {
+    LOG.info("Got response from RM for container ask, allocatedCnt="
+        + allocatedContainers.size());
+    numAllocatedContainers.addAndGet(allocatedContainers.size());
     for (Container allocatedContainer : allocatedContainers) {
     for (Container allocatedContainer : allocatedContainers) {
-      LOG.info("Launching shell command on a new container."
-          + ", containerId=" + allocatedContainer.getId()
-          + ", containerNode=" + allocatedContainer.getNodeId().getHost() 
-          + ":" + allocatedContainer.getNodeId().getPort()
-          + ", containerNodeURI=" + allocatedContainer.getNodeHttpAddress()
-          + ", containerState" + allocatedContainer.getState()
-          + ", containerResourceMemory"  
-          + allocatedContainer.getResource().getMemory());
-          
-          
-      // Launch and start the container on a separate thread to keep the main 
-      // thread unblocked as all containers may not be allocated at one go.
-      LaunchContainerRunnable runnableLaunchContainer = 
-          new LaunchContainerRunnable(allocatedContainer);
-      Thread launchThread = new Thread(runnableLaunchContainer);	
+      LaunchContainerRunnable runnableLaunchContainer =
+          new LaunchContainerRunnable(allocatedContainer, containerListener);
+      Thread launchThread = new Thread(runnableLaunchContainer);
+
+      // launch and start the container on a separate thread to keep
+      // the main thread unblocked
+      // as all containers may not be allocated at one go.
       launchThreads.add(launchThread);
       launchThreads.add(launchThread);
       launchThread.start();
       launchThread.start();
     }
     }
+  }
++---+
 
 
-    // Check what the current available resources in the cluster are
-    Resource availableResources = allocateResponse.getAvailableResources();
-    // Based on this information, an ApplicationMaster can make appropriate 
-    // decisions
-
-    // Check the completed containers
-    // Let's assume we are keeping a count of total completed containers, 
-    // containers that failed and ones that completed successfully.  			
-    List<ContainerStatus> completedContainers = 
-        allocateResponse.getCompletedContainersStatuses();
-    for (ContainerStatus containerStatus : completedContainers) {				
-      LOG.info("Got container status for containerID= " 
-          + containerStatus.getContainerId()
-          + ", state=" + containerStatus.getState()	
-          + ", exitStatus=" + containerStatus.getExitStatus() 
-          + ", diagnostics=" + containerStatus.getDiagnostics());
-
-      int exitStatus = containerStatus.getExitStatus();
-      if (0 != exitStatus) {
-        // container failed 
-        // -100 is a special case where the container 
-        // was aborted/pre-empted for some reason 
-        if (-100 != exitStatus) {
-          // application job on container returned a non-zero exit code
-          // counts as completed 
-          numCompletedContainers.incrementAndGet();
-          numFailedContainers.incrementAndGet();							
-        }
-        else { 
-          // something else bad happened 
-          // app job did not complete for some reason 
-          // we should re-try as the container was lost for some reason
-          // decrementing the requested count so that we ask for an
-          // additional one in the next allocate call.          
-          numRequestedContainers.decrementAndGet();
-          // we do not need to release the container as that has already 
-          // been done by the ResourceManager/NodeManager. 
-        }
-        }
-        else { 
-          // nothing to do 
-          // container completed successfully 
-          numCompletedContainers.incrementAndGet();
-          numSuccessfulContainers.incrementAndGet();
-        }
-      }
-    }
-+---+      
+    * On heart beat, the event handler reports the progress of the application.
 
 
-    
-  * After a container has been allocated to the ApplicationMaster, it needs to 
-    follow a similar process that the Client followed in setting up the 
-    ContainerLaunchContext for the eventual task that is going to be running on 
-    the allocated Container. Once the ContainerLaunchContext is defined, the 
-    ApplicationMaster can then communicate with the ContainerManager to start 
-    its allocated container.
-       
-+---+
-       
-    //Assuming an allocated Container obtained from AllocateResponse
-    Container container;   
-    // Connect to ContainerManager on the allocated container 
-    String cmIpPortStr = container.getNodeId().getHost() + ":" 
-        + container.getNodeId().getPort();		
-    InetSocketAddress cmAddress = NetUtils.createSocketAddr(cmIpPortStr);		
-    ContainerManager cm = 
-        (ContainerManager)rpc.getProxy(ContainerManager.class, cmAddress, conf);     
-
-    // Now we setup a ContainerLaunchContext  
-    ContainerLaunchContext ctx = 
-        Records.newRecord(ContainerLaunchContext.class);
-
-    ctx.setContainerId(container.getId());
-    ctx.setResource(container.getResource());
-
-    try {
-      ctx.setUser(UserGroupInformation.getCurrentUser().getShortUserName());
-    } catch (IOException e) {
-      LOG.info(
-          "Getting current user failed when trying to launch the container",
-          + e.getMessage());
-    }
++---+
+  @Override
+  public float getProgress() {
+    // set progress to deliver to RM on next heartbeat
+    float progress = (float) numCompletedContainers.get()
+        / numTotalContainers;
+    return progress;
+  }
++---+
 
 
-    // Set the environment 
-    Map<String, String> unixEnv;
-    // Setup the required env. 
-    // Please note that the launched container does not inherit 
-    // the environment of the ApplicationMaster so all the 
-    // necessary environment settings will need to be re-setup 
-    // for this allocated container.      
-    ctx.setEnvironment(unixEnv);
-
-    // Set the local resources 
-    Map<String, LocalResource> localResources = 
-        new HashMap<String, LocalResource>();
-    // Again, the local resources from the ApplicationMaster is not copied over 
-    // by default to the allocated container. Thus, it is the responsibility 
- 	  // of the ApplicationMaster to setup all the necessary local resources 
- 	  // needed by the job that will be executed on the allocated container. 
-      
-    // Assume that we are executing a shell script on the allocated container 
-    // and the shell script's location in the filesystem is known to us. 
-    Path shellScriptPath; 
-    LocalResource shellRsrc = Records.newRecord(LocalResource.class);
-    shellRsrc.setType(LocalResourceType.FILE);
-    shellRsrc.setVisibility(LocalResourceVisibility.APPLICATION);	   
-    shellRsrc.setResource(
-        ConverterUtils.getYarnUrlFromURI(new URI(shellScriptPath)));
-    shellRsrc.setTimestamp(shellScriptPathTimestamp);
-    shellRsrc.setSize(shellScriptPathLen);
-    localResources.put("MyExecShell.sh", shellRsrc);
-
-    ctx.setLocalResources(localResources);			
-
-    // Set the necessary command to execute on the allocated container 
-    String command = "/bin/sh ./MyExecShell.sh"
-        + " 1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout"
-        + " 2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr";
-
-    List<String> commands = new ArrayList<String>();
-    commands.add(command);
-    ctx.setCommands(commands);
-
-    // Send the start request to the ContainerManager
-    StartContainerRequest startReq = Records.newRecord(StartContainerRequest.class);
-    startReq.setContainerLaunchContext(ctx);
-    cm.startContainer(startReq);
-+---+                
-      
-  * The ApplicationMaster, as mentioned previously, will get updates of 
-    completed containers as part of the response from the ApplicationMasterProtocol#allocate 
-    calls. It can also monitor its launched containers pro-actively by querying 
-    the ContainerManager for the status. 
-    
+    []
+
+  * The container launch thread actually launches the containers on NMs. After a
+    container has been allocated to the AM, it needs to follow a similar process
+    that the client followed in setting up the <<<ContainerLaunchContext>>> for
+    the eventual task that is going to be running on the allocated Container.
+    Once the <<<ContainerLaunchContext>>> is defined, the AM can start it through
+    the <<<NMClientAsync>>>.
+
++---+
+  // Set the necessary command to execute on the allocated container
+  Vector<CharSequence> vargs = new Vector<CharSequence>(5);
+
+  // Set executable command
+  vargs.add(shellCommand);
+  // Set shell script path
+  if (!scriptPath.isEmpty()) {
+    vargs.add(Shell.WINDOWS ? ExecBatScripStringtPath
+      : ExecShellStringPath);
+  }
+
+  // Set args for the shell command if any
+  vargs.add(shellArgs);
+  // Add log redirect params
+  vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout");
+  vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr");
+
+  // Get final commmand
+  StringBuilder command = new StringBuilder();
+  for (CharSequence str : vargs) {
+    command.append(str).append(" ");
+  }
+
+  List<String> commands = new ArrayList<String>();
+  commands.add(command.toString());
+
+  // Set up ContainerLaunchContext, setting local resource, environment,
+  // command and token for constructor.
+
+  // Note for tokens: Set up tokens for the container too. Today, for normal
+  // shell commands, the container in distribute-shell doesn't need any
+  // tokens. We are populating them mainly for NodeManagers to be able to
+  // download anyfiles in the distributed file-system. The tokens are
+  // otherwise also useful in cases, for e.g., when one is running a
+  // "hadoop dfs" command inside the distributed shell.
+  ContainerLaunchContext ctx = ContainerLaunchContext.newInstance(
+    localResources, shellEnv, commands, null, allTokens.duplicate(), null);
+  containerListener.addContainer(container.getId(), container);
+  nmClientAsync.startContainerAsync(container, ctx);
 +---+
 +---+
 
 
-    GetContainerStatusRequest statusReq = 
-        Records.newRecord(GetContainerStatusRequest.class);
-    statusReq.setContainerId(container.getId());
-    GetContainerStatusResponse statusResp = cm.getContainerStatus(statusReq);
-    LOG.info("Container Status"
-        + ", id=" + container.getId()
-        + ", status=" + statusResp.getStatus());
-+---+      
+  * The <<<NMClientAsync>>> object, together with its event handler, handles container events. Including container start, stop, status update, and occurs an error.
+  
+  * After the ApplicationMaster determines the work is done, it needs to unregister itself through the AM-RM client, and then stops the client. 
+
++---+
+  try {
+    amRMClient.unregisterApplicationMaster(appStatus, appMessage, null);
+  } catch (YarnException ex) {
+    LOG.error("Failed to unregister application", ex);
+  } catch (IOException e) {
+    LOG.error("Failed to unregister application", e);
+  }
+  
+  amRMClient.stop();
++---+
 
 
 ~~** Defining the context in which your code runs
 ~~** Defining the context in which your code runs
 
 
-~~*** Container Resource Requests 
+~~*** Container Resource Requests
 
 
-~~*** Local Resources 
+~~*** Local Resources
 
 
-~~*** Environment 
+~~*** Environment
 
 
-~~**** Managing the CLASSPATH 
+~~**** Managing the CLASSPATH
 
 
-~~** Security 
+~~** Security
 
 
-* FAQ 
+* FAQ
 
 
-** How can I distribute my application's jars to all of the nodes in the YARN 
+** How can I distribute my application's jars to all of the nodes in the YARN
    cluster that need it?
    cluster that need it?
 
 
-  You can use the LocalResource to add resources to your application request. 
-  This will cause YARN to distribute the resource to the ApplicationMaster node. 
-  If the resource is a tgz, zip, or jar - you can have YARN unzip it. Then, all 
-  you need to do is add the unzipped folder to your classpath. 
-  For example, when creating your application request:
-
-+---+
-    File packageFile = new File(packagePath);
-    Url packageUrl = ConverterUtils.getYarnUrlFromPath(
-        FileContext.getFileContext.makeQualified(new Path(packagePath)));
-
-    packageResource.setResource(packageUrl);
-    packageResource.setSize(packageFile.length());
-    packageResource.setTimestamp(packageFile.lastModified());
-    packageResource.setType(LocalResourceType.ARCHIVE);
-    packageResource.setVisibility(LocalResourceVisibility.APPLICATION);
-
-    resource.setMemory(memory)
-    containerCtx.setResource(resource)
-    containerCtx.setCommands(ImmutableList.of(
-        "java -cp './package/*' some.class.to.Run "
-        + "1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout "
-        + "2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr"))
-    containerCtx.setLocalResources(
-        Collections.singletonMap("package", packageResource))
-    appCtx.setApplicationId(appId)
-    appCtx.setUser(user.getShortUserName)
-    appCtx.setAMContainerSpec(containerCtx)
-    request.setApplicationSubmissionContext(appCtx)
-    applicationsManager.submitApplication(request)
-+---+
-
-  As you can see, the setLocalResources command takes a map of names to 
-  resources. The name becomes a sym link in your application's cwd, so you can 
-  just refer to the artifacts inside by using ./package/*. 
-  
-  Note: Java's classpath (cp) argument is VERY sensitive. 
-  Make sure you get the syntax EXACTLY correct.
+  * You can use the LocalResource to add resources to your application request.
+    This will cause YARN to distribute the resource to the ApplicationMaster
+    node. If the resource is a tgz, zip, or jar - you can have YARN unzip it.
+    Then, all you need to do is add the unzipped folder to your classpath. For
+    example, when creating your application request:
 
 
-  Once your package is distributed to your ApplicationMaster, you'll need to 
-  follow the same process whenever your ApplicationMaster starts a new container 
-  (assuming you want the resources to be sent to your container). The code for 
-  this is the same. You just need to make sure that you give your 
-  ApplicationMaster the package path (either HDFS, or local), so that it can 
-  send the resource URL along with the container ctx.
++---+
+  File packageFile = new File(packagePath);
+  Url packageUrl = ConverterUtils.getYarnUrlFromPath(
+      FileContext.getFileContext.makeQualified(new Path(packagePath)));
+
+  packageResource.setResource(packageUrl);
+  packageResource.setSize(packageFile.length());
+  packageResource.setTimestamp(packageFile.lastModified());
+  packageResource.setType(LocalResourceType.ARCHIVE);
+  packageResource.setVisibility(LocalResourceVisibility.APPLICATION);
+
+  resource.setMemory(memory);
+  containerCtx.setResource(resource);
+  containerCtx.setCommands(ImmutableList.of(
+      "java -cp './package/*' some.class.to.Run "
+      + "1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout "
+      + "2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr"));
+  containerCtx.setLocalResources(
+      Collections.singletonMap("package", packageResource));
+  appCtx.setApplicationId(appId);
+  appCtx.setUser(user.getShortUserName);
+  appCtx.setAMContainerSpec(containerCtx);
+  yarnClient.submitApplication(appCtx);
++---+
 
 
-** How do I get the ApplicationMaster's ApplicationAttemptId? 
+  As you can see, the <<<setLocalResources>>> command takes a map of names to
+  resources. The name becomes a sym link in your application's cwd, so you can
+  just refer to the artifacts inside by using ./package/*.
 
 
+  Note: Java's classpath (cp) argument is VERY sensitive.
+  Make sure you get the syntax EXACTLY correct.
 
 
-  The ApplicationAttemptId will be passed to the ApplicationMaster via the 
-  environment and the value from the environment can be converted into an 
-  ApplicationAttemptId object via the ConverterUtils helper function.
+  Once your package is distributed to your AM, you'll need to follow the same
+  process whenever your AM starts a new container (assuming you want the
+  resources to be sent to your container). The code for this is the same. You
+  just need to make sure that you give your AM the package path (either HDFS, or
+  local), so that it can send the resource URL along with the container ctx.
 
 
-** My container is being killed by the Node Manager
+** How do I get the ApplicationMaster's <<<ApplicationAttemptId>>>?
 
 
-  This is likely due to high memory usage exceeding your requested container 
-  memory size. There are a number of reasons that can cause this. First, look 
-  at the process tree that the node manager dumps when it kills your container. 
-  The two things you're interested in are physical memory and virtual memory. 
-  If you have exceeded physical memory limits your app is using too much physical 
-  memory. If you're running a Java app, you can use -hprof to look at what is 
-  taking up space in the heap. If you have exceeded virtual memory, you may
-  need to increase the value of the the cluster-wide configuration variable
-  <<<yarn.nodemanager.vmem-pmem-ratio>>>.
+  * The <<<ApplicationAttemptId>>> will be passed to the AM via the environment
+    and the value from the environment can be converted into an
+    <<<ApplicationAttemptId>>> object via the ConverterUtils helper function.
 
 
-** How do I include native libraries?
+** Why my container is killed by the NodeManager?
 
 
+  * This is likely due to high memory usage exceeding your requested container
+    memory size. There are a number of reasons that can cause this. First, look
+    at the process tree that the NodeManager dumps when it kills your container.
+    The two things you're interested in are physical memory and virtual memory.
+    If you have exceeded physical memory limits your app is using too much
+    physical memory. If you're running a Java app, you can use -hprof to look at
+    what is taking up space in the heap. If you have exceeded virtual memory, you
+    may need to increase the value of the the cluster-wide configuration variable
+    <<<yarn.nodemanager.vmem-pmem-ratio>>>.
+
+** How do I include native libraries?
 
 
-  Setting -Djava.library.path on the command line while launching a container 
-  can cause native libraries used by Hadoop to not be loaded correctly and can
-  result in errors. It is cleaner to use LD_LIBRARY_PATH instead.
+  * Setting <<<-Djava.library.path>>> on the command line while launching a
+    container can cause native libraries used by Hadoop to not be loaded
+    correctly and can result in errors. It is cleaner to use
+    <<<LD_LIBRARY_PATH>>> instead.
 
 
 * Useful Links
 * Useful Links
 
 
-  * {{{https://issues.apache.org/jira/secure/attachment/12486023/MapReduce_NextGen_Architecture.pdf}Map Reduce Next Generation Architecture}}
+  * {{{http://hadoop.apache.org/docs/current/hadoop-yarn/hadoop-yarn-site/YARN.html}YARN Architecture}}
+
+  * {{{http://hadoop.apache.org/docs/current/hadoop-yarn/hadoop-yarn-site/CapacityScheduler.html}YARN Capacity Scheduler}}
+
+  * {{{http://hadoop.apache.org/docs/current/hadoop-yarn/hadoop-yarn-site/FairScheduler.html}YARN Fair Scheduler}}
+
+* Sample code
 
 
-  * {{{http://developer.yahoo.com/blogs/hadoop/posts/2011/03/mapreduce-nextgen-scheduler/}Map Reduce Next Generation Scheduler}}
+  * Yarn distributed shell: in <<<hadoop-yarn-applications-distributedshell>>>
+    project after you set up your development environment.