Selaa lähdekoodia

MAPREDUCE-2719. Add a simple, DistributedShell, application to illustrate alternate frameworks on YARN. Contributed by Hitesh Shah.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1177864 13f79535-47bb-0310-9956-ffa450edef68
Arun Murthy 13 vuotta sitten
vanhempi
commit
fad230a49d

+ 3 - 0
hadoop-mapreduce-project/CHANGES.txt

@@ -75,6 +75,9 @@ Release 0.23.0 - Unreleased
     MAPREDUCE-2930. Added the ability to be able to generate graphs from the
     state-machine definitions. (Binglin Chang via vinodkv)
 
+    MAPREDUCE-2719. Add a simple, DistributedShell, application to illustrate
+    alternate frameworks on YARN. (Hitesh Shah via acmurthy)
+
   IMPROVEMENTS
 
     MAPREDUCE-2187. Reporter sends progress during sort/merge. (Anupam Seth via

+ 105 - 0
hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/pom.xml

@@ -0,0 +1,105 @@
+<?xml version="1.0"?>
+<!--
+  Licensed under the Apache License, Version 2.0 (the "License");
+  you may not use this file except in compliance with the License.
+  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License. See accompanying LICENSE file.
+-->
+<project>
+  <parent>
+    <artifactId>hadoop-yarn-applications</artifactId>
+    <groupId>org.apache.hadoop</groupId>
+    <version>${yarn.version}</version>
+  </parent>
+  <modelVersion>4.0.0</modelVersion>
+  <groupId>org.apache.hadoop</groupId>
+  <artifactId>hadoop-yarn-applications-distributedshell</artifactId>
+  <name>hadoop-yarn-applications-distributedshell</name>
+
+  <properties>
+    <install.file>${project.artifact.file}</install.file>
+    <yarn.basedir>${project.parent.parent.basedir}</yarn.basedir>
+  </properties>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-yarn-api</artifactId>
+      <version>${yarn.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-yarn-common</artifactId>
+      <version>${yarn.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-yarn-server-nodemanager</artifactId>
+      <scope>test</scope>
+      <version>${yarn.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-yarn-server-resourcemanager</artifactId>
+      <scope>test</scope>
+      <version>${yarn.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-yarn-server-common</artifactId>
+      <scope>test</scope>
+      <version>${yarn.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-yarn-server-tests</artifactId>
+      <type>test-jar</type>
+      <scope>test</scope>
+      <version>${yarn.version}</version>
+    </dependency>
+  </dependencies>
+
+  <build>
+    <plugins>
+      <plugin>
+        <artifactId>maven-jar-plugin</artifactId>
+        <executions>
+          <execution>
+            <goals>
+              <goal>jar</goal>
+            </goals>
+            <!-- strictly speaking, the unit test is really a regression test. It 
+                 needs the main jar to be available to be able to run. -->
+            <phase>test-compile</phase>
+          </execution>
+        </executions>
+      </plugin>
+      <plugin>
+        <artifactId>maven-dependency-plugin</artifactId>
+        <executions>
+          <execution>
+            <id>build-classpath</id>
+            <phase>generate-sources</phase>
+            <goals>
+              <goal>build-classpath</goal>
+            </goals>
+            <configuration>
+              <!-- needed to run the unit test for DS to generate the required classpath 
+                   that is required in the env of the launch container in the mini yarn cluster -->
+              <outputFile>target/classes/yarn-apps-ds-generated-classpath</outputFile>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+    </plugins>
+  </build>
+
+
+</project>

+ 831 - 0
hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java

@@ -0,0 +1,831 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.applications.distributedshell;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.net.InetSocketAddress;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Vector;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.AMRMProtocol;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.ContainerManager;
+
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
+//import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusRequest;
+//import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
+
+import org.apache.hadoop.yarn.api.records.AMResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.LocalResourceType;
+import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.hadoop.yarn.util.Records;
+
+/**
+ * An ApplicationMaster for executing shell commands on a set of launched containers using the YARN framework. 
+ * 
+ * <p>This class is meant to act as an example on how to write yarn-based application masters. </p>
+ * 
+ * <p> The ApplicationMaster is started on a container by the <code>ResourceManager</code>'s launcher. 
+ * The first thing that the <code>ApplicationMaster</code> needs to do is to connect and register itself with 
+ * the <code>ResourceManager</code>. The registration sets up information within the <code>ResourceManager</code>
+ * regarding what host:port the ApplicationMaster is listening on to provide any form of functionality to a client
+ * as well as a tracking url that a client can use to keep track of status/job history if needed. </p>
+ * 
+ * <p> The <code>ApplicationMaster</code> needs to send a heartbeat to the <code>ResourceManager</code> at regular intervals
+ * to inform the <code>ResourceManager</code> that it is up and alive. The {@link AMRMProtocol#allocate} to the 
+ * <code>ResourceManager</code> from the <code>ApplicationMaster</code> acts as a heartbeat.
+ * 
+ * <p> For the actual handling of the job, the <code>ApplicationMaster</code> has to request for the 
+ * <code>ResourceManager</code> via {@link AllocateRequest} for the required no. of containers using {@link ResourceRequest}
+ * with the necessary resource specifications such as node location, computational (memory/disk/cpu) resource requirements.
+ * The <code>ResourceManager</code> responds with an {@link AllocateResponse} that informs the <code>ApplicationMaster</code> 
+ * of the set of newly allocated containers, completed containers as well as current state of available resources. </p>
+ * 
+ * <p> For each allocated container, the <code>ApplicationMaster</code> can then set up the necessary launch context via 
+ * {@link ContainerLaunchContext} to specify the allocated container id, local resources required by the executable, 
+ * the environment to be setup for the executable, commands to execute, etc. and submit a {@link StartContainerRequest} 
+ * to the {@link ContainerManager} to launch and execute the defined commands on the given allocated container. </p>
+ *  
+ * <p> The <code>ApplicationMaster</code> can monitor the launched container by either querying the <code>ResourceManager</code> 
+ * using {@link AMRMProtocol#allocate} to get updates on completed containers or via the {@link ContainerManager} 
+ * by querying for the status of the allocated container's {@link ContainerId}.
+ * 
+ * <p> After the job has been completed, the <code>ApplicationMaster</code> has to send a {@link FinishApplicationMasterRequest} 
+ * to the <code>ResourceManager</code> to inform it that the <code>ApplicationMaster</code> has been completed. 
+ */
+public class ApplicationMaster {
+
+  private static final Log LOG = LogFactory.getLog(ApplicationMaster.class);
+
+  // Configuration 
+  private Configuration conf;
+  // YARN RPC to communicate with the Resource Manager or Node Manager
+  private YarnRPC rpc;
+
+  // Handle to communicate with the Resource Manager
+  private AMRMProtocol resourceManager;
+
+  // Application Attempt Id ( combination of attemptId and fail count )
+  private ApplicationAttemptId appAttemptID;
+
+  // TODO
+  // For status update for clients - yet to be implemented
+  // Hostname of the container 
+  private String appMasterHostname = "";
+  // Port on which the app master listens for status update requests from clients
+  private int appMasterRpcPort = 0;
+  // Tracking url to which app master publishes info for clients to monitor 
+  private String appMasterTrackingUrl = "";
+
+  // App Master configuration
+  // No. of containers to run shell command on
+  private int numTotalContainers = 1;
+  // Memory to request for the container on which the shell command will run 
+  private int containerMemory = 10;
+  // Priority of the request
+  private int requestPriority; 
+
+  // Incremental counter for rpc calls to the RM
+  private AtomicInteger rmRequestID = new AtomicInteger();
+
+  // Simple flag to denote whether all works is done
+  private boolean appDone = false; 
+  // Counter for completed containers ( complete denotes successful or failed )
+  private AtomicInteger numCompletedContainers = new AtomicInteger();
+  // Allocated container count so that we know how many containers has the RM
+  // allocated to us
+  private AtomicInteger numAllocatedContainers = new AtomicInteger();
+  // Count of failed containers 
+  private AtomicInteger numFailedContainers = new AtomicInteger();
+  // Count of containers already requested from the RM
+  // Needed as once requested, we should not request for containers again and again. 
+  // Only request for more if the original requirement changes. 
+  private AtomicInteger numRequestedContainers = new AtomicInteger();
+
+  // Shell command to be executed 
+  private String shellCommand = ""; 
+  // Args to be passed to the shell command
+  private String shellArgs = "";
+  // Env variables to be setup for the shell command 
+  private Map<String, String> shellEnv = new HashMap<String, String>();
+
+  // Location of shell script ( obtained from info set in env )
+  // Shell script path in fs
+  private String shellScriptPath = ""; 
+  // Timestamp needed for creating a local resource
+  private long shellScriptPathTimestamp = 0;
+  // File length needed for local resource
+  private long shellScriptPathLen = 0;
+
+  // Hardcoded path to shell script in launch container's local env
+  private final String ExecShellStringPath = "ExecShellScript.sh";
+
+  // Containers to be released
+  private CopyOnWriteArrayList<ContainerId> releasedContainers = new CopyOnWriteArrayList<ContainerId>();
+
+  // Launch threads
+  private List<Thread> launchThreads = new ArrayList<Thread>();	
+
+  /**
+   * @param args Command line args
+   */
+  public static void main(String[] args) {
+    boolean result = false;
+    try {
+      ApplicationMaster appMaster = new ApplicationMaster();
+      LOG.info("Initializing ApplicationMaster");
+      boolean doRun = appMaster.init(args);	
+      if (!doRun) {
+        System.exit(0);
+      }
+      result = appMaster.run();
+    } catch (Throwable t) {
+      LOG.fatal("Error running ApplicationMaster", t);
+      System.exit(1);
+    }		
+    if (result) {
+      LOG.info("Application Master completed successfully. exiting");
+      System.exit(0);
+    }
+    else {
+      LOG.info("Application Master failed. exiting");
+      System.exit(2);			
+    }
+  }
+
+  /**
+   * Dump out contents of $CWD and the environment to stdout for debugging
+   */
+  private void dumpOutDebugInfo() {
+
+    LOG.info("Dump debug output");		
+    Map<String, String> envs = System.getenv();
+    for (Map.Entry<String, String> env : envs.entrySet()) {
+      LOG.info("System env: key=" + env.getKey() + ", val=" + env.getValue());
+      System.out.println("System env: key=" + env.getKey() + ", val=" + env.getValue());
+    }
+
+    String cmd = "ls -al";
+    Runtime run = Runtime.getRuntime();
+    Process pr = null;
+    try {
+      pr = run.exec(cmd);
+      pr.waitFor();
+
+      BufferedReader buf = new BufferedReader(new InputStreamReader(pr.getInputStream()));
+      String line = "";
+      while ((line=buf.readLine())!=null) {
+        LOG.info("System CWD content: " + line);
+        System.out.println("System CWD content: " + line);
+      }
+      buf.close();
+    } catch (IOException e) {
+      e.printStackTrace();
+    } catch (InterruptedException e) {
+      e.printStackTrace();
+    } 
+  }
+
+  public ApplicationMaster() throws Exception {
+    // Set up the configuration and RPC
+    conf = new Configuration();
+    rpc = YarnRPC.create(conf);
+  }
+  /**
+   * Parse command line options
+   * @param args Command line args 
+   * @return Whether init successful and run should be invoked 
+   * @throws ParseException
+   * @throws IOException 
+   */
+  public boolean init(String[] args) throws ParseException, IOException {
+
+    Options opts = new Options();
+    opts.addOption("app_attempt_id", true, "App Attempt ID. Not to be used unless for testing purposes");
+    opts.addOption("shell_command", true, "Shell command to be executed by the Application Master");
+    opts.addOption("shell_script", true, "Location of the shell script to be executed");
+    opts.addOption("shell_args", true, "Command line args for the shell script");
+    opts.addOption("shell_env", true, "Environment for shell script. Specified as env_key=env_val pairs");
+    opts.addOption("container_memory", true, "Amount of memory in MB to be requested to run the shell command");
+    opts.addOption("num_containers", true, "No. of containers on which the shell command needs to be executed");
+    opts.addOption("priority", true, "Application Priority. Default 0");
+    opts.addOption("debug", false, "Dump out debug information");
+
+    opts.addOption("help", false, "Print usage");
+    CommandLine cliParser = new GnuParser().parse(opts, args);
+
+    if (args.length == 0) {
+      printUsage(opts);
+      throw new IllegalArgumentException("No args specified for application master to initialize");
+    }		
+
+    if (cliParser.hasOption("help")) {
+      printUsage(opts);
+      return false;
+    }
+
+    if (cliParser.hasOption("debug")) {
+      dumpOutDebugInfo();
+    }
+
+    Map<String, String> envs = System.getenv();
+
+    appAttemptID = Records.newRecord(ApplicationAttemptId.class);
+    if (!envs.containsKey(ApplicationConstants.APPLICATION_ATTEMPT_ID_ENV)) {
+      if (cliParser.hasOption("app_attempt_id")) {
+        String appIdStr = cliParser.getOptionValue("app_attempt_id", "");
+        appAttemptID = ConverterUtils.toApplicationAttemptId(appIdStr);
+      } 
+      else {
+        throw new IllegalArgumentException("Application Attempt Id not set in the environment");				
+      }	
+    } else {
+      appAttemptID = ConverterUtils.toApplicationAttemptId(envs.get(ApplicationConstants.APPLICATION_ATTEMPT_ID_ENV));
+    }
+
+    LOG.info("Application master for app"
+        + ", appId=" + appAttemptID.getApplicationId().getId()
+        + ", clustertimestamp=" + appAttemptID.getApplicationId().getClusterTimestamp()
+        + ", attemptId=" + appAttemptID.getAttemptId());
+
+    if (!cliParser.hasOption("shell_command")) {
+      throw new IllegalArgumentException("No shell command specified to be executed by application master");
+    }
+    shellCommand = cliParser.getOptionValue("shell_command");
+
+    if (cliParser.hasOption("shell_args")) {
+      shellArgs = cliParser.getOptionValue("shell_args");
+    }
+    if (cliParser.hasOption("shell_env")) { 
+      String shellEnvs[] = cliParser.getOptionValues("shell_env");
+      for (String env : shellEnvs) {
+        env = env.trim();
+        int index = env.indexOf('=');
+        if (index == -1) {
+          shellEnv.put(env, "");
+          continue;
+        }
+        String key = env.substring(0, index);
+        String val = "";
+        if (index < (env.length()-1)) {
+          val = env.substring(index+1);
+        }
+        shellEnv.put(key, val);
+      }
+    }
+
+    if (envs.containsKey(DSConstants.DISTRIBUTEDSHELLSCRIPTLOCATION)) {
+      shellScriptPath = envs.get(DSConstants.DISTRIBUTEDSHELLSCRIPTLOCATION);
+
+      if (envs.containsKey(DSConstants.DISTRIBUTEDSHELLSCRIPTTIMESTAMP)) {				
+        shellScriptPathTimestamp = Long.valueOf(envs.get(DSConstants.DISTRIBUTEDSHELLSCRIPTTIMESTAMP));				
+      }					
+      if (envs.containsKey(DSConstants.DISTRIBUTEDSHELLSCRIPTLEN)) {
+        shellScriptPathLen = Long.valueOf(envs.get(DSConstants.DISTRIBUTEDSHELLSCRIPTLEN));				
+      }
+
+      if (!shellScriptPath.isEmpty()
+          && (shellScriptPathTimestamp <= 0 
+          || shellScriptPathLen <= 0)) {
+        LOG.error("Illegal values in env for shell script path"
+            + ", path=" + shellScriptPath
+            + ", len=" + shellScriptPathLen
+            + ", timestamp=" + shellScriptPathTimestamp);	
+        throw new IllegalArgumentException("Illegal values in env for shell script path");
+      }
+    }
+
+    containerMemory = Integer.parseInt(cliParser.getOptionValue("container_memory", "10"));
+    numTotalContainers = Integer.parseInt(cliParser.getOptionValue("num_containers", "1"));
+    requestPriority = Integer.parseInt(cliParser.getOptionValue("priority", "0"));
+
+    return true;
+  }
+
+  /**
+   * Helper function to print usage 
+   * @param opts Parsed command line options
+   */
+  private void printUsage(Options opts) {
+    new HelpFormatter().printHelp("ApplicationMaster", opts);		
+  }
+
+  /**
+   * Main run function for the application master
+   * @throws YarnRemoteException
+   */
+  public boolean run() throws YarnRemoteException {
+    LOG.info("Starting ApplicationMaster");
+
+    // Connect to ResourceManager 	
+    resourceManager = connectToRM();
+
+    // Setup local RPC Server to accept status requests directly from clients 
+    // TODO need to setup a protocol for client to be able to communicate to the RPC server 
+    // TODO use the rpc port info to register with the RM for the client to send requests to this app master
+
+    // Register self with ResourceManager 
+    RegisterApplicationMasterResponse response = registerToRM();
+    // Dump out information about cluster capability as seen by the resource manager
+    int minMem = response.getMinimumResourceCapability().getMemory();
+    int maxMem = response.getMaximumResourceCapability().getMemory();
+    LOG.info("Min mem capabililty of resources in this cluster " + minMem);
+    LOG.info("Max mem capabililty of resources in this cluster " + maxMem);
+
+    // A resource ask has to be atleast the minimum of the capability of the cluster, the value has to be 
+    // a multiple of the min value and cannot exceed the max. 
+    // If it is not an exact multiple of min, the RM will allocate to the nearest multiple of min		
+    if (containerMemory < minMem) {
+      LOG.info("Container memory specified below min threshold of cluster. Using min value."
+          + ", specified=" + containerMemory
+          + ", min=" + minMem);
+      containerMemory = minMem; 
+    } 
+    else if (containerMemory > maxMem) {
+      LOG.info("Container memory specified above max threshold of cluster. Using max value."
+          + ", specified=" + containerMemory
+          + ", max=" + maxMem);
+      containerMemory = maxMem;
+    }
+
+    // Setup heartbeat emitter 		
+    // TODO poll RM every now and then with an empty request to let RM know that we are alive
+    // The heartbeat interval after which an AM is timed out by the RM is defined by a config setting: 
+    // RM_AM_EXPIRY_INTERVAL_MS with default defined by DEFAULT_RM_AM_EXPIRY_INTERVAL_MS
+    // The allocate calls to the RM count as heartbeats so, for now, this additional heartbeat emitter 
+    // is not required.
+
+    // Setup ask for containers from RM		    
+    // Send request for containers to RM
+    // Until we get our fully allocated quota, we keep on polling RM for containers
+    // Keep looping until all the containers are launched and shell script executed on them 
+    // ( regardless of success/failure). 
+
+    int loopCounter = -1;
+
+    while (numCompletedContainers.get() < numTotalContainers
+        && !appDone) {
+      loopCounter++;		
+
+      // log current state
+      LOG.info("Current application state: loop=" + loopCounter 
+          + ", appDone=" + appDone
+          + ", total=" + numTotalContainers
+          + ", requested=" + numRequestedContainers
+          + ", completed=" + numCompletedContainers
+          + ", failed=" + numFailedContainers
+          + ", currentAllocated=" + numAllocatedContainers);			
+
+      // Sleep before each loop when asking RM for containers
+      // to avoid flooding RM with spurious requests when it 
+      // need not have any available containers 
+      // Sleeping for 1000 ms.
+      try {
+        Thread.sleep(1000);
+      } catch (InterruptedException e) {
+        LOG.info("Sleep interrupted " + e.getMessage());				
+      }
+
+      // No. of containers to request 
+      // For the first loop, askCount will be equal to total containers needed 
+      // From that point on, askCount will always be 0 as current implementation 
+      // does not change its ask on container failures. 
+      int askCount = numTotalContainers - numRequestedContainers.get();
+      numRequestedContainers.addAndGet(askCount);
+
+      // Setup request to be sent to RM to allocate containers
+      List<ResourceRequest> resourceReq = new ArrayList<ResourceRequest>();
+      if (askCount > 0) {
+        ResourceRequest containerAsk = setupContainerAskForRM(askCount);			
+        resourceReq.add(containerAsk);
+      }
+
+      // Send the request to RM 
+      LOG.info("Asking RM for containers"
+          + ", askCount=" + askCount);
+      AMResponse amResp =	sendContainerAskToRM(resourceReq);			
+
+      // Retrieve list of allocated containers from the response 
+      List<Container> allocatedContainers = amResp.getAllocatedContainers();
+      LOG.info("Got response from RM for container ask, allocatedCnt=" + allocatedContainers.size());
+      numAllocatedContainers.addAndGet(allocatedContainers.size());
+      for (Container allocatedContainer : allocatedContainers) {
+        LOG.info("Launching shell command on a new container."
+            + ", containerId=" + allocatedContainer.getId()
+            + ", containerNode=" + allocatedContainer.getNodeId().getHost() 
+            + ":" + allocatedContainer.getNodeId().getPort()
+            + ", containerNodeURI=" + allocatedContainer.getNodeHttpAddress()
+            + ", containerState" + allocatedContainer.getState()
+            + ", containerResourceMemory" + allocatedContainer.getResource().getMemory());
+        //						+ ", containerToken" + allocatedContainer.getContainerToken().getIdentifier().toString());
+
+        LaunchContainerRunnable runnableLaunchContainer = new LaunchContainerRunnable(allocatedContainer);
+        Thread launchThread = new Thread(runnableLaunchContainer);	
+
+        // launch and start the container on a separate thread to keep the main thread unblocked
+        // as all containers may not be allocated at one go.
+        launchThreads.add(launchThread);
+        launchThread.start();
+      }
+
+      // Check what the current available resources in the cluster are
+      // TODO should we do anything if the available resources are not enough? 
+      Resource availableResources = amResp.getAvailableResources();
+      LOG.info("Current available resources in the cluster " + availableResources);			
+
+      // Check the completed containers 			
+      List<ContainerStatus> completedContainers = amResp.getCompletedContainersStatuses();
+      LOG.info("Got response from RM for container ask, completedCnt=" + completedContainers.size());
+      for (ContainerStatus containerStatus : completedContainers) {				
+        LOG.info("Got container status for containerID= " + containerStatus.getContainerId()
+            + ", state=" + containerStatus.getState()	
+            + ", exitStatus=" + containerStatus.getExitStatus() 
+            + ", diagnostics=" + containerStatus.getDiagnostics());
+
+        // non complete containers should not be here 
+        assert(containerStatus.getState() == ContainerState.COMPLETE);
+
+        // increment counters for completed/failed containers
+        int exitStatus = containerStatus.getExitStatus();
+        if (0 != exitStatus) {
+          // container failed 
+          if (-100 != exitStatus) {
+            // shell script failed
+            // counts as completed 
+            numCompletedContainers.incrementAndGet();
+            numFailedContainers.incrementAndGet();							
+          }
+          else { 
+            // something else bad happened 
+            // app job did not complete for some reason 
+            // we should re-try as the container was lost for some reason
+            numAllocatedContainers.decrementAndGet();
+            numRequestedContainers.decrementAndGet();
+            // we do not need to release the container as it would be done
+            // by the RM/CM.
+          }
+        }
+        else { 
+          // nothing to do 
+          // container completed successfully 
+          numCompletedContainers.incrementAndGet();
+          LOG.info("Container completed successfully."
+              + ", containerId=" + containerStatus.getContainerId());
+        }
+
+      }
+      if (numCompletedContainers.get() == numTotalContainers) {
+        appDone = true;
+      }
+
+      LOG.info("Current application state: loop=" + loopCounter
+          + ", appDone=" + appDone
+          + ", total=" + numTotalContainers					
+          + ", requested=" + numRequestedContainers
+          + ", completed=" + numCompletedContainers
+          + ", failed=" + numFailedContainers
+          + ", currentAllocated=" + numAllocatedContainers);	
+
+      // TODO 
+      // Add a timeout handling layer 
+      // for misbehaving shell commands			
+    }
+
+    // Join all launched threads
+    // needed for when we time out 
+    // and we need to release containers
+    for (Thread launchThread : launchThreads) {
+      try {
+        launchThread.join(10000);
+      } catch (InterruptedException e) {
+        LOG.info("Exception thrown in thread join: " + e.getMessage());
+        e.printStackTrace();
+      }			
+    }
+
+    // When the application completes, it should send a finish application signal 
+    // to the RM
+    LOG.info("Application completed. Signalling finish to RM");
+
+    FinishApplicationMasterRequest finishReq = Records.newRecord(FinishApplicationMasterRequest.class);
+    finishReq.setAppAttemptId(appAttemptID);
+    boolean isSuccess = true;
+    if (numFailedContainers.get() == 0) {
+      finishReq.setFinishApplicationStatus(FinalApplicationStatus.SUCCEEDED);
+    }
+    else {
+      finishReq.setFinishApplicationStatus(FinalApplicationStatus.FAILED);
+      String diagnostics = "Diagnostics."
+          + ", total=" + numTotalContainers
+          + ", completed=" + numCompletedContainers.get()
+          + ", allocated=" + numAllocatedContainers.get()
+          + ", failed=" + numFailedContainers.get();
+      finishReq.setDiagnostics(diagnostics);
+      isSuccess = false;
+    }
+    resourceManager.finishApplicationMaster(finishReq);
+    return isSuccess;
+  }
+
+  /**
+   * Thread to connect to the {@link ContainerManager} and 
+   * launch the container that will execute the shell command. 
+   */
+  private class LaunchContainerRunnable implements Runnable {
+
+    // Allocated container 
+    Container container;
+    // Handle to communicate with ContainerManager
+    ContainerManager cm;
+
+    /**
+     * @param lcontainer Allocated container
+     */
+    public LaunchContainerRunnable(Container lcontainer) {
+      this.container = lcontainer;
+    }
+
+    /**
+     * Helper function to connect to CM
+     */
+    private void connectToCM() {
+      String cmIpPortStr = container.getNodeId().getHost() + ":" 
+          + container.getNodeId().getPort();		
+      InetSocketAddress cmAddress = NetUtils.createSocketAddr(cmIpPortStr);		
+      LOG.info("Connecting to ResourceManager at " + cmIpPortStr);
+      this.cm = ((ContainerManager) rpc.getProxy(ContainerManager.class, cmAddress, conf));
+    }
+
+
+    @Override
+    /**
+     * Connects to CM, sets up container launch context 
+     * for shell command and eventually dispatches the container 
+     * start request to the CM. 
+     */
+    public void run() {
+      // Connect to ContainerManager 
+      LOG.info("Connecting to container manager for containerid=" + container.getId());
+      connectToCM();
+
+      LOG.info("Setting up container launch container for containerid=" + container.getId());
+      ContainerLaunchContext ctx = Records.newRecord(ContainerLaunchContext.class);
+
+      ctx.setContainerId(container.getId());
+      ctx.setResource(container.getResource());
+
+      try {
+        ctx.setUser(UserGroupInformation.getCurrentUser().getShortUserName());
+      } catch (IOException e) {
+        LOG.info("Getting current user info failed when trying to launch the container"
+            + e.getMessage());
+      }
+
+      // Set the environment 
+      ctx.setEnvironment(shellEnv);
+
+      // Set the local resources 
+      Map<String, LocalResource> localResources = new HashMap<String, LocalResource>();
+
+      // The container for the eventual shell commands needs its own local resources too. 
+      // In this scenario, if a shell script is specified, we need to have it copied 
+      // and made available to the container. 
+      if (!shellScriptPath.isEmpty()) {
+        LocalResource shellRsrc = Records.newRecord(LocalResource.class);
+        shellRsrc.setType(LocalResourceType.FILE);
+        shellRsrc.setVisibility(LocalResourceVisibility.APPLICATION);	   
+        try {
+          shellRsrc.setResource(ConverterUtils.getYarnUrlFromURI(new URI(shellScriptPath)));
+        } catch (URISyntaxException e) {
+          LOG.error("Error when trying to use shell script path specified in env"
+              + ", path=" + shellScriptPath);
+          e.printStackTrace();
+
+          // A failure scenario on bad input such as invalid shell script path 
+          // We know we cannot continue launching the container 
+          // so we should release it. 															 					
+          // TODO
+          numCompletedContainers.incrementAndGet();
+          numFailedContainers.incrementAndGet();
+          return;					
+        }
+        shellRsrc.setTimestamp(shellScriptPathTimestamp);
+        shellRsrc.setSize(shellScriptPathLen);
+        localResources.put(ExecShellStringPath, shellRsrc);
+      }			
+      ctx.setLocalResources(localResources);			
+
+      // Set the necessary command to execute on the allocated container 
+      Vector<CharSequence> vargs = new Vector<CharSequence>(5);
+
+      // Set executable command 
+      vargs.add(shellCommand);
+      // Set shell script path 
+      if (!shellScriptPath.isEmpty()) {
+        vargs.add(ExecShellStringPath);
+      }
+
+      // Set args for the shell command if any			
+      vargs.add(shellArgs);
+      // Add log redirect params
+      // TODO
+      // We should redirect the output to hdfs instead of local logs 
+      // so as to be able to look at the final output after the containers 
+      // have been released. 
+      // Could use a path suffixed with /AppId/AppAttempId/ContainerId/std[out|err] 
+      vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout");
+      vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr");
+
+      // Get final commmand
+      StringBuilder command = new StringBuilder();
+      for (CharSequence str : vargs) {
+        command.append(str).append(" ");
+      }
+
+      List<String> commands = new ArrayList<String>();
+      commands.add(command.toString());
+      ctx.setCommands(commands);
+
+      StartContainerRequest startReq = Records.newRecord(StartContainerRequest.class);
+      startReq.setContainerLaunchContext(ctx);
+      try {
+        cm.startContainer(startReq);
+      } catch (YarnRemoteException e) {
+        LOG.info("Start container failed for :"
+            + ", containerId=" + container.getId());
+        e.printStackTrace();
+        // TODO do we need to release this container? 
+      }
+
+      // Get container status?
+      // Left commented out as the shell scripts are short lived 
+      // and we are relying on the status for completed containers from RM to detect status
+
+      //		    GetContainerStatusRequest statusReq = Records.newRecord(GetContainerStatusRequest.class);
+      //		    statusReq.setContainerId(container.getId());
+      //		    GetContainerStatusResponse statusResp;
+      //			try {
+      //				statusResp = cm.getContainerStatus(statusReq);
+      //			    LOG.info("Container Status"
+      //			    		+ ", id=" + container.getId()
+      //			    		+ ", status=" +statusResp.getStatus());
+      //			} catch (YarnRemoteException e) {
+      //				e.printStackTrace();
+      //			}
+    }		
+  }	
+
+  /**
+   * Connect to the Resource Manager
+   * @return Handle to communicate with the RM
+   */
+  private AMRMProtocol connectToRM() {
+    YarnConfiguration yarnConf = new YarnConfiguration(conf);
+    InetSocketAddress rmAddress = NetUtils.createSocketAddr(yarnConf.get(
+        YarnConfiguration.RM_SCHEDULER_ADDRESS,
+        YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS));		
+    LOG.info("Connecting to ResourceManager at " + rmAddress);
+    return ((AMRMProtocol) rpc.getProxy(AMRMProtocol.class, rmAddress, conf));
+  }		
+
+  /** 
+   * Register the Application Master to the Resource Manager
+   * @return the registration response from the RM
+   * @throws YarnRemoteException
+   */
+  private RegisterApplicationMasterResponse registerToRM() throws YarnRemoteException {		
+    RegisterApplicationMasterRequest appMasterRequest = Records.newRecord(RegisterApplicationMasterRequest.class);	
+
+    // set the required info into the registration request: 
+    // application attempt id, 
+    // host on which the app master is running
+    // rpc port on which the app master accepts requests from the client 
+    // tracking url for the app master
+    appMasterRequest.setApplicationAttemptId(appAttemptID);	
+    appMasterRequest.setHost(appMasterHostname);
+    appMasterRequest.setRpcPort(appMasterRpcPort);
+    appMasterRequest.setTrackingUrl(appMasterTrackingUrl);
+
+    return resourceManager.registerApplicationMaster(appMasterRequest);
+  }
+
+  /**
+   * Setup the request that will be sent to the RM for the container ask.
+   * @param numContainers Containers to ask for from RM
+   * @return the setup ResourceRequest to be sent to RM
+   */
+  private ResourceRequest setupContainerAskForRM(int numContainers) {
+    ResourceRequest request = Records.newRecord(ResourceRequest.class);
+
+    // setup requirements for hosts 
+    // whether a particular rack/host is needed 
+    // Refer to apis under org.apache.hadoop.net for more 
+    // details on how to get figure out rack/host mapping.
+    // using * as any host will do for the distributed shell app
+    request.setHostName("*");
+
+    // set no. of containers needed
+    request.setNumContainers(numContainers);
+
+    // set the priority for the request
+    Priority pri = Records.newRecord(Priority.class);
+    // TODO - what is the range for priority? how to decide? 
+    pri.setPriority(requestPriority);
+    request.setPriority(pri);	    
+
+    // Set up resource type requirements
+    // For now, only memory is supported so we set memory requirements
+    Resource capability = Records.newRecord(Resource.class);
+    capability.setMemory(containerMemory);
+    request.setCapability(capability);
+
+    return request;
+  }
+
+  /**
+   * Ask RM to allocate given no. of containers to this Application Master
+   * @param requestedContainers Containers to ask for from RM
+   * @return Response from RM to AM with allocated containers 
+   * @throws YarnRemoteException
+   */
+  private AMResponse sendContainerAskToRM(List<ResourceRequest> requestedContainers)
+      throws YarnRemoteException {	
+    AllocateRequest req = Records.newRecord(AllocateRequest.class);
+    req.setResponseId(rmRequestID.incrementAndGet());
+    req.setApplicationAttemptId(appAttemptID);
+    req.addAllAsks(requestedContainers);
+    req.addAllReleases(releasedContainers);
+    req.setProgress((float)numCompletedContainers.get()/numTotalContainers);
+
+    LOG.info("Sending request to RM for containers"
+        + ", requestedSet=" + requestedContainers.size()
+        + ", releasedSet=" + releasedContainers.size()
+        + ", progress=" + req.getProgress());
+
+    for (ResourceRequest  rsrcReq : requestedContainers) {
+      LOG.info("Requested container ask: " + rsrcReq.toString());
+    }
+    for (ContainerId id : releasedContainers) {
+      LOG.info("Released container, id=" + id.getId());
+    }
+
+    AllocateResponse resp = resourceManager.allocate(req);		
+    return resp.getAMResponse();		    
+  }
+}

+ 791 - 0
hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java

@@ -0,0 +1,791 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.applications.distributedshell;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Vector;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.SecurityInfo;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.ClientRMProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.LocalResourceType;
+import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.api.records.NodeReport;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.QueueACL;
+import org.apache.hadoop.yarn.api.records.QueueInfo;
+import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.security.client.ClientRMSecurityInfo;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.hadoop.yarn.util.Records;
+
+
+/**
+ * Client for Distributed Shell application submission to YARN.
+ * 
+ * <p> The distributed shell client allows an application master to be launched that in turn would run 
+ * the provided shell command on a set of containers. </p>
+ * 
+ * <p>This client is meant to act as an example on how to write yarn-based applications. </p>
+ * 
+ * <p> To submit an application, a client first needs to connect to the <code>ResourceManager</code> 
+ * aka ApplicationsManager or ASM via the {@link ClientRMProtocol}. The {@link ClientRMProtocol} 
+ * provides a way for the client to get access to cluster information and to request for a
+ * new {@link ApplicationId}. <p>
+ * 
+ * <p> For the actual job submission, the client first has to create an {@link ApplicationSubmissionContext}. 
+ * The {@link ApplicationSubmissionContext} defines the application details such as {@link ApplicationId} 
+ * and application name, user submitting the application, the priority assigned to the application and the queue 
+ * to which this application needs to be assigned. In addition to this, the {@link ApplicationSubmissionContext}
+ * also defines the {@link ContainerLaunchContext} which describes the <code>Container</code> with which 
+ * the {@link ApplicationMaster} is launched. </p>
+ * 
+ * <p> The {@link ContainerLaunchContext} in this scenario defines the resources to be allocated for the 
+ * {@link ApplicationMaster}'s container, the local resources (jars, configuration files) to be made available 
+ * and the environment to be set for the {@link ApplicationMaster} and the commands to be executed to run the 
+ * {@link ApplicationMaster}. <p>
+ * 
+ * <p> Using the {@link ApplicationSubmissionContext}, the client submits the application to the 
+ * <code>ResourceManager</code> and then monitors the application by requesting the <code>ResourceManager</code> 
+ * for an {@link ApplicationReport} at regular time intervals. In case of the application taking too long, the client 
+ * kills the application by submitting a {@link KillApplicationRequest} to the <code>ResourceManager</code>. </p>
+ *
+ */
+public class Client {
+
+  private static final Log LOG = LogFactory.getLog(Client.class);
+
+  // Configuration
+  private Configuration conf;
+
+  // RPC to communicate to RM
+  private YarnRPC rpc;
+
+  // Handle to talk to the Resource Manager/Applications Manager
+  private ClientRMProtocol applicationsManager;
+
+  // Application master specific info to register a new Application with RM/ASM
+  private String appName = "";
+  // App master priority
+  private int amPriority = 0;
+  // Queue for App master
+  private String amQueue = "";
+  // User to run app master as
+  private String amUser = "";
+  // Amt. of memory resource to request for to run the App Master
+  private int amMemory = 10; 
+
+  // Application master jar file
+  private String appMasterJar = ""; 
+  // Main class to invoke application master
+  private String appMasterMainClass = "";
+
+  // Shell command to be executed 
+  private String shellCommand = ""; 
+  // Location of shell script 
+  private String shellScriptPath = ""; 
+  // Args to be passed to the shell command
+  private String shellArgs = "";
+  // Env variables to be setup for the shell command 
+  private Map<String, String> shellEnv = new HashMap<String, String>();
+  // Shell Command Container priority 
+  private int shellCmdPriority = 0;
+
+  // Amt of memory to request for container in which shell script will be executed
+  private int containerMemory = 10; 
+  // No. of containers in which the shell script needs to be executed
+  private int numContainers = 1;
+
+  // log4j.properties file 
+  // if available, add to local resources and set into classpath 
+  private String log4jPropFile = "";	
+
+  // Start time for client
+  private final long clientStartTime = System.currentTimeMillis();
+  // Timeout threshold for client. Kill app after time interval expires.
+  private long clientTimeout = 600000;
+
+  // Debug flag
+  boolean debugFlag = false;	
+
+  /**
+   * @param args Command line arguments 
+   */
+  public static void main(String[] args) {
+    boolean result = false;
+    try {
+      Client client = new Client();
+      LOG.info("Initializing Client");
+      boolean doRun = client.init(args);
+      if (!doRun) {
+        System.exit(0);
+      }
+      result = client.run();
+    } catch (Throwable t) {
+      LOG.fatal("Error running CLient", t);
+      System.exit(1);
+    }
+    if (result) {
+      LOG.info("Application completed successfully");
+      System.exit(0);			
+    } 
+    LOG.error("Application failed to complete successfully");
+    System.exit(2);
+  }
+
+  /**
+   */
+  public Client() throws Exception  {
+    // Set up the configuration and RPC
+    conf = new Configuration();
+    rpc = YarnRPC.create(conf);
+  }
+
+  /**
+   * Helper function to print out usage
+   * @param opts Parsed command line options 
+   */
+  private void printUsage(Options opts) {
+    new HelpFormatter().printHelp("Client", opts);
+  }
+
+  /**
+   * Parse command line options
+   * @param args Parsed command line options 
+   * @return Whether the init was successful to run the client
+   */
+  public boolean init(String[] args) throws ParseException {
+
+    Options opts = new Options();
+    opts.addOption("appname", true, "Application Name. Default value - DistributedShell");
+    opts.addOption("priority", true, "Application Priority. Default 0");
+    opts.addOption("queue", true, "RM Queue in which this application is to be submitted");
+    opts.addOption("user", true, "User to run the application as");
+    opts.addOption("timeout", true, "Application timeout in milliseconds");
+    opts.addOption("master_memory", true, "Amount of memory in MB to be requested to run the application master");
+    opts.addOption("jar", true, "Jar file containing the application master");
+    opts.addOption("class", true, "Main class to  be run for the Application Master.");
+    opts.addOption("shell_command", true, "Shell command to be executed by the Application Master");
+    opts.addOption("shell_script", true, "Location of the shell script to be executed");
+    opts.addOption("shell_args", true, "Command line args for the shell script");
+    opts.addOption("shell_env", true, "Environment for shell script. Specified as env_key=env_val pairs");
+    opts.addOption("shell_cmd_priority", true, "Priority for the shell command containers");		
+    opts.addOption("container_memory", true, "Amount of memory in MB to be requested to run the shell command");
+    opts.addOption("num_containers", true, "No. of containers on which the shell command needs to be executed");
+    opts.addOption("log_properties", true, "log4j.properties file");
+    opts.addOption("debug", false, "Dump out debug information");
+    opts.addOption("help", false, "Print usage");
+    CommandLine cliParser = new GnuParser().parse(opts, args);
+
+    if (args.length == 0) {
+      printUsage(opts);
+      throw new IllegalArgumentException("No args specified for client to initialize");
+    }		
+
+    if (cliParser.hasOption("help")) {
+      printUsage(opts);
+      return false;
+    }
+
+    if (cliParser.hasOption("debug")) {
+      debugFlag = true;
+
+    }
+
+    appName = cliParser.getOptionValue("appname", "DistributedShell");
+    amPriority = Integer.parseInt(cliParser.getOptionValue("priority", "0"));
+    amQueue = cliParser.getOptionValue("queue", "");
+    amUser = cliParser.getOptionValue("user", "");
+    amMemory = Integer.parseInt(cliParser.getOptionValue("master_memory", "10"));		
+
+    if (amMemory < 0) {
+      throw new IllegalArgumentException("Invalid memory specified for application master, exiting."
+          + " Specified memory=" + amMemory);
+    }
+
+    if (!cliParser.hasOption("jar")) {
+      throw new IllegalArgumentException("No jar file specified for application master");
+    }		
+
+    appMasterJar = cliParser.getOptionValue("jar");
+    appMasterMainClass = cliParser.getOptionValue("class",
+        "org.apache.hadoop.yarn.applications.distributedshell.ApplicationMaster");		
+
+    if (!cliParser.hasOption("shell_command")) {
+      throw new IllegalArgumentException("No shell command specified to be executed by application master");
+    }
+    shellCommand = cliParser.getOptionValue("shell_command");
+
+    if (cliParser.hasOption("shell_script")) {
+      shellScriptPath = cliParser.getOptionValue("shell_script");
+    }
+    if (cliParser.hasOption("shell_args")) {
+      shellArgs = cliParser.getOptionValue("shell_args");
+    }
+    if (cliParser.hasOption("shell_env")) { 
+      String envs[] = cliParser.getOptionValues("shell_env");
+      for (String env : envs) {
+        env = env.trim();
+        int index = env.indexOf('=');
+        if (index == -1) {
+          shellEnv.put(env, "");
+          continue;
+        }
+        String key = env.substring(0, index);
+        String val = "";
+        if (index < (env.length()-1)) {
+          val = env.substring(index+1);
+        }
+        shellEnv.put(key, val);
+      }
+    }
+    shellCmdPriority = Integer.parseInt(cliParser.getOptionValue("shell_cmd_priority", "0"));
+
+    containerMemory = Integer.parseInt(cliParser.getOptionValue("container_memory", "10"));
+    numContainers = Integer.parseInt(cliParser.getOptionValue("num_containers", "1"));
+
+    if (containerMemory < 0 || numContainers < 1) {
+      throw new IllegalArgumentException("Invalid no. of containers or container memory specified, exiting."
+          + " Specified containerMemory=" + containerMemory
+          + ", numContainer=" + numContainers);
+    }
+
+    clientTimeout = Integer.parseInt(cliParser.getOptionValue("timeout", "600000"));
+
+    log4jPropFile = cliParser.getOptionValue("log_properties", "");
+
+    return true;
+  }
+
+  /**
+   * Main run function for the client
+   * @return true if application completed successfully
+   * @throws IOException
+   */
+  public boolean run() throws IOException {
+    LOG.info("Starting Client");
+
+    // Connect to ResourceManager 	
+    connectToASM();
+    assert(applicationsManager != null);		
+
+    // Use ClientRMProtocol handle to general cluster information 
+    GetClusterMetricsRequest clusterMetricsReq = Records.newRecord(GetClusterMetricsRequest.class);
+    GetClusterMetricsResponse clusterMetricsResp = applicationsManager.getClusterMetrics(clusterMetricsReq);
+    LOG.info("Got Cluster metric info from ASM" 
+        + ", numNodeManagers=" + clusterMetricsResp.getClusterMetrics().getNumNodeManagers());
+
+    GetClusterNodesRequest clusterNodesReq = Records.newRecord(GetClusterNodesRequest.class);
+    GetClusterNodesResponse clusterNodesResp = applicationsManager.getClusterNodes(clusterNodesReq);
+    LOG.info("Got Cluster node info from ASM");
+    for (NodeReport node : clusterNodesResp.getNodeReports()) {
+      LOG.info("Got node report from ASM for"
+          + ", nodeId=" + node.getNodeId() 
+          + ", nodeAddress" + node.getHttpAddress()
+          + ", nodeRackName" + node.getRackName()
+          + ", nodeNumContainers" + node.getNumContainers()
+          + ", nodeHealthStatus" + node.getNodeHealthStatus());
+    }
+
+    GetQueueInfoRequest queueInfoReq = Records.newRecord(GetQueueInfoRequest.class);
+    GetQueueInfoResponse queueInfoResp = applicationsManager.getQueueInfo(queueInfoReq);		
+    QueueInfo queueInfo = queueInfoResp.getQueueInfo();
+    LOG.info("Queue info"
+        + ", queueName=" + queueInfo.getQueueName()
+        + ", queueCurrentCapacity=" + queueInfo.getCurrentCapacity()
+        + ", queueMaxCapacity=" + queueInfo.getMaximumCapacity()
+        + ", queueApplicationCount=" + queueInfo.getApplications().size()
+        + ", queueChildQueueCount=" + queueInfo.getChildQueues().size());		
+
+    GetQueueUserAclsInfoRequest queueUserAclsReq = Records.newRecord(GetQueueUserAclsInfoRequest.class);
+    GetQueueUserAclsInfoResponse queueUserAclsResp = applicationsManager.getQueueUserAcls(queueUserAclsReq);				
+    List<QueueUserACLInfo> listAclInfo = queueUserAclsResp.getUserAclsInfoList();
+    for (QueueUserACLInfo aclInfo : listAclInfo) {
+      for (QueueACL userAcl : aclInfo.getUserAcls()) {
+        LOG.info("User ACL Info for Queue"
+            + ", queueName=" + aclInfo.getQueueName()			
+            + ", userAcl=" + userAcl.name());
+      }
+    }		
+
+    // Get a new application id 
+    GetNewApplicationResponse newApp = getApplication();
+    ApplicationId appId = newApp.getApplicationId();
+
+    // TODO get min/max resource capabilities from RM and change memory ask if needed
+    // If we do not have min/max, we may not be able to correctly request 
+    // the required resources from the RM for the app master
+    // Memory ask has to be a multiple of min and less than max. 
+    // Dump out information about cluster capability as seen by the resource manager
+    int minMem = newApp.getMinimumResourceCapability().getMemory();
+    int maxMem = newApp.getMaximumResourceCapability().getMemory();
+    LOG.info("Min mem capabililty of resources in this cluster " + minMem);
+    LOG.info("Max mem capabililty of resources in this cluster " + maxMem);
+
+    // A resource ask has to be atleast the minimum of the capability of the cluster, the value has to be 
+    // a multiple of the min value and cannot exceed the max. 
+    // If it is not an exact multiple of min, the RM will allocate to the nearest multiple of min
+    if (amMemory < minMem) {
+      LOG.info("AM memory specified below min threshold of cluster. Using min value."
+          + ", specified=" + amMemory
+          + ", min=" + minMem);
+      amMemory = minMem; 
+    } 
+    else if (amMemory > maxMem) {
+      LOG.info("AM memory specified above max threshold of cluster. Using max value."
+          + ", specified=" + amMemory
+          + ", max=" + maxMem);
+      amMemory = maxMem;
+    }				
+
+    // Create launch context for app master
+    LOG.info("Setting up application submission context for ASM");
+    ApplicationSubmissionContext appContext = Records.newRecord(ApplicationSubmissionContext.class);
+
+    // set the application id 
+    appContext.setApplicationId(appId);
+    // set the application name
+    appContext.setApplicationName(appName);
+
+    // Set up the container launch context for the application master
+    ContainerLaunchContext amContainer = Records.newRecord(ContainerLaunchContext.class);
+
+    // set local resources for the application master
+    // local files or archives as needed
+    // In this scenario, the jar file for the application master is part of the local resources			
+    Map<String, LocalResource> localResources = new HashMap<String, LocalResource>();
+
+    LOG.info("Copy App Master jar from local filesystem and add to local environment");
+    // Copy the application master jar to the filesystem 
+    // Create a local resource to point to the destination jar path 
+    FileSystem fs = FileSystem.get(conf);
+    Path src = new Path(appMasterJar);
+    String pathSuffix = appName + "/" + appId.getId() + "/AppMaster.jar";	    
+    Path dst = new Path(fs.getHomeDirectory(), pathSuffix);
+    fs.copyFromLocalFile(false, true, src, dst);
+    FileStatus destStatus = fs.getFileStatus(dst);
+    LocalResource amJarRsrc = Records.newRecord(LocalResource.class);
+
+    // Set the type of resource - file or archive
+    // archives are untarred at destination
+    // we don't need the jar file to be untarred for now
+    amJarRsrc.setType(LocalResourceType.FILE);
+    // Set visibility of the resource 
+    // Setting to most private option
+    amJarRsrc.setVisibility(LocalResourceVisibility.APPLICATION);	   
+    // Set the resource to be copied over
+    amJarRsrc.setResource(ConverterUtils.getYarnUrlFromPath(dst)); 
+    // Set timestamp and length of file so that the framework 
+    // can do basic sanity checks for the local resource 
+    // after it has been copied over to ensure it is the same 
+    // resource the client intended to use with the application
+    amJarRsrc.setTimestamp(destStatus.getModificationTime());
+    amJarRsrc.setSize(destStatus.getLen());
+    localResources.put("AppMaster.jar",  amJarRsrc);
+
+    // Set the log4j properties if needed 
+    if (!log4jPropFile.isEmpty()) {
+      Path log4jSrc = new Path(log4jPropFile);
+      Path log4jDst = new Path(fs.getHomeDirectory(), "log4j.props");
+      fs.copyFromLocalFile(false, true, log4jSrc, log4jDst);
+      FileStatus log4jFileStatus = fs.getFileStatus(log4jDst);
+      LocalResource log4jRsrc = Records.newRecord(LocalResource.class);
+      log4jRsrc.setType(LocalResourceType.FILE);
+      log4jRsrc.setVisibility(LocalResourceVisibility.APPLICATION);	   
+      log4jRsrc.setResource(ConverterUtils.getYarnUrlFromURI(log4jDst.toUri()));
+      log4jRsrc.setTimestamp(log4jFileStatus.getModificationTime());
+      log4jRsrc.setSize(log4jFileStatus.getLen());
+      localResources.put("log4j.properties", log4jRsrc);
+    }			
+
+    // The shell script has to be made available on the final container(s)
+    // where it will be executed. 
+    // To do this, we need to first copy into the filesystem that is visible 
+    // to the yarn framework. 
+    // We do not need to set this as a local resource for the application 
+    // master as the application master does not need it. 		
+    String hdfsShellScriptLocation = ""; 
+    long hdfsShellScriptLen = 0;
+    long hdfsShellScriptTimestamp = 0;
+    if (!shellScriptPath.isEmpty()) {
+      Path shellSrc = new Path(shellScriptPath);
+      String shellPathSuffix = appName + "/" + appId.getId() + "/ExecShellScript.sh";
+      Path shellDst = new Path(fs.getHomeDirectory(), shellPathSuffix);
+      fs.copyFromLocalFile(false, true, shellSrc, shellDst);
+      hdfsShellScriptLocation = shellDst.toUri().toString(); 
+      FileStatus shellFileStatus = fs.getFileStatus(shellDst);
+      hdfsShellScriptLen = shellFileStatus.getLen();
+      hdfsShellScriptTimestamp = shellFileStatus.getModificationTime();
+    }
+
+    // Set local resource info into app master container launch context
+    amContainer.setLocalResources(localResources);
+
+    // Set the necessary security tokens as needed
+    //amContainer.setContainerTokens(containerToken);
+
+    // Set the env variables to be setup in the env where the application master will be run
+    LOG.info("Set the environment for the application master");
+    Map<String, String> env = new HashMap<String, String>();
+
+    // put location of shell script into env
+    // using the env info, the application master will create the correct local resource for the 
+    // eventual containers that will be launched to execute the shell scripts
+    env.put(DSConstants.DISTRIBUTEDSHELLSCRIPTLOCATION, hdfsShellScriptLocation);
+    env.put(DSConstants.DISTRIBUTEDSHELLSCRIPTTIMESTAMP, Long.toString(hdfsShellScriptTimestamp));
+    env.put(DSConstants.DISTRIBUTEDSHELLSCRIPTLEN, Long.toString(hdfsShellScriptLen));
+
+    // Add AppMaster.jar location to classpath 		
+    // At some point we should not be required to add 
+    // the hadoop specific classpaths to the env. 
+    // It should be provided out of the box. 
+    // For now setting all required classpaths including
+    // the classpath to "." for the application jar
+    String classPathEnv = "${CLASSPATH}"
+        + ":./*"
+        + ":$HADOOP_CONF_DIR"
+        + ":$HADOOP_COMMON_HOME/share/hadoop/common/*"
+        + ":$HADOOP_COMMON_HOME/share/hadoop/common/lib/*"
+        + ":$HADOOP_HDFS_HOME/share/hadoop/hdfs/*"
+        + ":$HADOOP_HDFS_HOME/share/hadoop/hdfs/lib/*"
+        + ":$YARN_HOME/modules/*"
+        + ":$YARN_HOME/lib/*"
+        + ":./log4j.properties:";
+
+    // add the runtime classpath needed for tests to work 
+    String testRuntimeClassPath = Client.getTestRuntimeClasspath();
+    classPathEnv += ":" + testRuntimeClassPath; 
+
+    env.put("CLASSPATH", classPathEnv);
+
+    amContainer.setEnvironment(env);
+
+    // Set the necessary command to execute the application master 
+    Vector<CharSequence> vargs = new Vector<CharSequence>(30);
+
+    // Set java executable command 
+    LOG.info("Setting up app master command");
+    vargs.add("${JAVA_HOME}" + "/bin/java");
+    // Set class name 
+    vargs.add(appMasterMainClass);
+    // Set params for Application Master
+    vargs.add("--container_memory " + String.valueOf(containerMemory));
+    vargs.add("--num_containers " + String.valueOf(numContainers));
+    vargs.add("--priority " + String.valueOf(shellCmdPriority));
+    if (!shellCommand.isEmpty()) {
+      vargs.add("--shell_command " + shellCommand + "");
+    }
+    if (!shellArgs.isEmpty()) {
+      vargs.add("--shell_args " + shellArgs + "");
+    }
+    for (Map.Entry<String, String> entry : shellEnv.entrySet()) {
+      vargs.add("--shell_env " + entry.getKey() + "=" + entry.getValue());
+    }			
+    if (debugFlag) {
+      vargs.add("--debug");
+    }
+
+    vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/AppMaster.stdout");
+    vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/AppMaster.stderr");
+
+    // Get final commmand
+    StringBuilder command = new StringBuilder();
+    for (CharSequence str : vargs) {
+      command.append(str).append(" ");
+    }
+
+    LOG.info("Completed setting up app master command " + command.toString());	   
+    List<String> commands = new ArrayList<String>();
+    commands.add(command.toString());		
+    amContainer.setCommands(commands);
+
+    // For launching an AM Container, setting user here is not needed
+    // Set user in ApplicationSubmissionContext
+    // amContainer.setUser(amUser);
+
+    // Set up resource type requirements
+    // For now, only memory is supported so we set memory requirements
+    Resource capability = Records.newRecord(Resource.class);
+    capability.setMemory(amMemory);
+    amContainer.setResource(capability);
+
+    // Service data is a binary blob that can be passed to the application
+    // Not needed in this scenario
+    // amContainer.setServiceData(serviceData);
+
+    // The following are not required for launching an application master 
+    // amContainer.setContainerId(containerId);		
+
+    appContext.setAMContainerSpec(amContainer);
+
+    // Set the priority for the application master
+    Priority pri = Records.newRecord(Priority.class);
+    // TODO - what is the range for priority? how to decide? 
+    pri.setPriority(amPriority);
+    appContext.setPriority(pri);
+
+    // Set the queue to which this application is to be submitted in the RM
+    appContext.setQueue(amQueue);
+    // Set the user submitting this application 
+    // TODO can it be empty? 
+    appContext.setUser(amUser);
+
+    // Create the request to send to the applications manager 
+    SubmitApplicationRequest appRequest = Records.newRecord(SubmitApplicationRequest.class);
+    appRequest.setApplicationSubmissionContext(appContext);
+
+    // Submit the application to the applications manager
+    // SubmitApplicationResponse submitResp = applicationsManager.submitApplication(appRequest);
+    // Ignore the response as either a valid response object is returned on success 
+    // or an exception thrown to denote some form of a failure
+    LOG.info("Submitting application to ASM");
+    applicationsManager.submitApplication(appRequest);
+
+    // TODO
+    // Try submitting the same request again
+    // app submission failure?
+
+    // Monitor the application
+    return monitorApplication(appId);
+
+  }
+
+  /**
+   * Monitor the submitted application for completion. 
+   * Kill application if time expires. 
+   * @param appId Application Id of application to be monitored
+   * @return true if application completed successfully
+   * @throws YarnRemoteException
+   */
+  private boolean monitorApplication(ApplicationId appId) throws YarnRemoteException {
+
+    while (true) {
+
+      // Check app status every 1 second.
+      try {
+        Thread.sleep(1000);
+      } catch (InterruptedException e) {
+        LOG.debug("Thread sleep in monitoring loop interrupted");
+      }
+
+      // Get application report for the appId we are interested in 
+      GetApplicationReportRequest reportRequest = Records.newRecord(GetApplicationReportRequest.class);
+      reportRequest.setApplicationId(appId);
+      GetApplicationReportResponse reportResponse = applicationsManager.getApplicationReport(reportRequest);
+      ApplicationReport report = reportResponse.getApplicationReport();
+
+      LOG.info("Got application report from ASM for"
+          + ", appId=" + appId.getId()
+          + ", clientToken=" + report.getClientToken()
+          + ", appDiagnostics=" + report.getDiagnostics()
+          + ", appMasterHost=" + report.getHost()
+          + ", appQueue=" + report.getQueue()
+          + ", appMasterRpcPort=" + report.getRpcPort()
+          + ", appStartTime=" + report.getStartTime()
+          + ", yarnAppState=" + report.getYarnApplicationState().toString()
+          + ", distributedFinalState=" + report.getFinalApplicationStatus().toString()
+          + ", appTrackingUrl=" + report.getTrackingUrl()
+          + ", appUser=" + report.getUser());
+
+      YarnApplicationState state = report.getYarnApplicationState();
+      FinalApplicationStatus dsStatus = report.getFinalApplicationStatus();
+      if (YarnApplicationState.FINISHED == state) {
+        if (FinalApplicationStatus.SUCCEEDED == dsStatus) {
+          LOG.info("Application has completed successfully. Breaking monitoring loop");
+          return true;        
+        }
+        else {
+          LOG.info("Application did finished unsuccessfully."
+              + " YarnState=" + state.toString() + ", DSFinalStatus=" + dsStatus.toString()
+              + ". Breaking monitoring loop");
+          return false;
+        }			  
+      }
+      else if (YarnApplicationState.KILLED == state	
+          || YarnApplicationState.FAILED == state) {
+        LOG.info("Application did not finish."
+            + " YarnState=" + state.toString() + ", DSFinalStatus=" + dsStatus.toString()
+            + ". Breaking monitoring loop");
+        return false;
+      }			
+
+      if (System.currentTimeMillis() > (clientStartTime + clientTimeout)) {
+        LOG.info("Reached client specified timeout for application. Killing application");
+        killApplication(appId);
+        return false;				
+      }
+    }			
+
+  }
+
+  /**
+   * Kill a submitted application by sending a call to the ASM
+   * @param appId Application Id to be killed. 
+   * @throws YarnRemoteException
+   */
+  private void killApplication(ApplicationId appId) throws YarnRemoteException {
+    KillApplicationRequest request = Records.newRecord(KillApplicationRequest.class);		
+    // TODO clarify whether multiple jobs with the same app id can be submitted and be running at 
+    // the same time. 
+    // If yes, can we kill a particular attempt only?
+    request.setApplicationId(appId);
+    // KillApplicationResponse response = applicationsManager.forceKillApplication(request);		
+    // Response can be ignored as it is non-null on success or 
+    // throws an exception in case of failures
+    applicationsManager.forceKillApplication(request);	
+  }
+
+  /**
+   * Connect to the Resource Manager/Applications Manager
+   * @return Handle to communicate with the ASM
+   * @throws IOException 
+   */
+  private void connectToASM() throws IOException {
+
+    /*
+		UserGroupInformation user = UserGroupInformation.getCurrentUser();
+		applicationsManager = user.doAs(new PrivilegedAction<ClientRMProtocol>() {
+			public ClientRMProtocol run() {
+				InetSocketAddress rmAddress = NetUtils.createSocketAddr(conf.get(
+					YarnConfiguration.RM_SCHEDULER_ADDRESS,
+					YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS));		
+				LOG.info("Connecting to ResourceManager at " + rmAddress);
+				Configuration appsManagerServerConf = new Configuration(conf);
+				appsManagerServerConf.setClass(YarnConfiguration.YARN_SECURITY_INFO,
+				ClientRMSecurityInfo.class, SecurityInfo.class);
+				ClientRMProtocol asm = ((ClientRMProtocol) rpc.getProxy(ClientRMProtocol.class, rmAddress, appsManagerServerConf));
+				return asm;
+			}
+		});
+     */
+    YarnConfiguration yarnConf = new YarnConfiguration(conf);
+    InetSocketAddress rmAddress = NetUtils.createSocketAddr(yarnConf.get(
+        YarnConfiguration.RM_ADDRESS,
+        YarnConfiguration.DEFAULT_RM_ADDRESS));		
+    LOG.info("Connecting to ResourceManager at " + rmAddress);
+    Configuration appsManagerServerConf = new Configuration(conf);
+    appsManagerServerConf.setClass(YarnConfiguration.YARN_SECURITY_INFO,
+        ClientRMSecurityInfo.class, SecurityInfo.class);
+    applicationsManager = ((ClientRMProtocol) rpc.getProxy(ClientRMProtocol.class, rmAddress, appsManagerServerConf));		
+  }		
+
+  /**
+   * Get a new application from the ASM 
+   * @return New Application
+   * @throws YarnRemoteException
+   */
+  private GetNewApplicationResponse getApplication() throws YarnRemoteException {
+    GetNewApplicationRequest request = Records.newRecord(GetNewApplicationRequest.class);		
+    GetNewApplicationResponse response = applicationsManager.getNewApplication(request);
+    LOG.info("Got new application id=" + response.getApplicationId());		
+    return response;		
+  }
+
+  private static String getTestRuntimeClasspath() {
+
+    InputStream classpathFileStream = null;
+    BufferedReader reader = null;
+    String envClassPath = "";
+
+    LOG.info("Trying to generate classpath for app master from current thread's classpath");
+    try {
+
+      // Create classpath from generated classpath
+      // Check maven ppom.xml for generated classpath info
+      // Works if compile time env is same as runtime. Mainly tests.
+      ClassLoader thisClassLoader =
+          Thread.currentThread().getContextClassLoader();
+      String generatedClasspathFile = "yarn-apps-ds-generated-classpath";
+      classpathFileStream =
+          thisClassLoader.getResourceAsStream(generatedClasspathFile);
+      if (classpathFileStream == null) {
+        LOG.info("Could not classpath resource from class loader");
+        return envClassPath;
+      }
+      LOG.info("Readable bytes from stream=" + classpathFileStream.available());
+      reader = new BufferedReader(new InputStreamReader(classpathFileStream));
+      String cp = reader.readLine();
+      if (cp != null) {
+        envClassPath += cp.trim() + ":";
+      }
+      // Put the file itself on classpath for tasks.
+      envClassPath += thisClassLoader.getResource(generatedClasspathFile).getFile();
+    } catch (IOException e) {
+      LOG.info("Could not find the necessary resource to generate class path for tests. Error=" + e.getMessage());
+    } 
+
+    try {
+      if (classpathFileStream != null) {
+        classpathFileStream.close();
+      }
+      if (reader != null) {
+        reader.close();
+      }
+    } catch (IOException e) {
+      LOG.info("Failed to close class path file stream or reader. Error=" + e.getMessage());
+    } 
+    return envClassPath;
+  }			
+
+}

+ 42 - 0
hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/DSConstants.java

@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.applications.distributedshell;
+
+/**
+ * Constants used in both Client and Application Master
+ */
+public class DSConstants {
+
+  /**
+   * Environment key name pointing to the shell script's location
+   */
+  public static final String DISTRIBUTEDSHELLSCRIPTLOCATION = "DISTRIBUTEDSHELLSCRIPTLOCATION";
+
+  /**
+   * Environment key name denoting the file timestamp for the shell script. 
+   * Used to validate the local resource. 
+   */
+  public static final String DISTRIBUTEDSHELLSCRIPTTIMESTAMP = "DISTRIBUTEDSHELLSCRIPTTIMESTAMP";
+
+  /**
+   * Environment key name denoting the file content length for the shell script. 
+   * Used to validate the local resource. 
+   */
+  public static final String DISTRIBUTEDSHELLSCRIPTLEN = "DISTRIBUTEDSHELLSCRIPTLEN";
+}

+ 94 - 0
hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java

@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.applications.distributedshell;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.server.MiniYARNCluster;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestDistributedShell {
+
+  private static final Log LOG =
+      LogFactory.getLog(TestDistributedShell.class);
+
+  protected static MiniYARNCluster yarnCluster = null;
+  protected static Configuration conf = new Configuration();
+
+  protected static String APPMASTER_JAR = "../hadoop-yarn-applications-distributedshell/target/hadoop-yarn-applications-distributedshell-0.24.0-SNAPSHOT.jar";
+
+  @BeforeClass
+  public static void setup() throws InterruptedException, IOException {
+    LOG.info("Starting up YARN cluster");
+    if (yarnCluster == null) {
+      yarnCluster = new MiniYARNCluster(TestDistributedShell.class.getName());
+      yarnCluster.init(conf);
+      yarnCluster.start();
+    }
+    try {
+      Thread.sleep(2000);
+    } catch (InterruptedException e) {
+      LOG.info("setup thread sleep interrupted. message=" + e.getMessage());
+    }	
+  }
+
+  @AfterClass
+  public static void tearDown() throws IOException {
+    if (yarnCluster != null) {
+      yarnCluster.stop();
+      yarnCluster = null;
+    }
+  }
+
+  @Test
+  public void testDSShell() throws Exception {
+
+    String[] args = {
+        "--jar",
+        APPMASTER_JAR,
+        "--num_containers",
+        "2",
+        "--shell_command",
+        "ls",
+        "--master_memory",
+        "1536",
+        "--container_memory",
+        "1536"				
+    };
+
+    LOG.info("Initializing DS Client");
+    Client client = new Client();
+    boolean initSuccess = client.init(args);
+    assert(initSuccess);
+    LOG.info("Running DS Client");
+    boolean result = client.run();
+
+    LOG.info("Client run completed. Result=" + result);
+    assert (result == true);		 
+
+  }
+
+
+}
+

+ 30 - 0
hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/pom.xml

@@ -0,0 +1,30 @@
+<?xml version="1.0"?>
+<!--
+  Licensed under the Apache License, Version 2.0 (the "License");
+  you may not use this file except in compliance with the License.
+  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License. See accompanying LICENSE file.
+-->
+<project>
+  <parent>
+    <artifactId>hadoop-yarn</artifactId>
+    <groupId>org.apache.hadoop</groupId>
+    <version>${yarn.version}</version>
+  </parent>
+  <modelVersion>4.0.0</modelVersion>
+  <groupId>org.apache.hadoop</groupId>
+  <artifactId>hadoop-yarn-applications</artifactId>
+  <name>hadoop-yarn-applications</name>
+  <packaging>pom</packaging>
+
+  <modules>
+    <module>hadoop-yarn-applications-distributedshell</module>
+  </modules>
+</project>

+ 1 - 0
hadoop-mapreduce-project/hadoop-yarn/pom.xml

@@ -424,5 +424,6 @@
     <module>hadoop-yarn-api</module>
     <module>hadoop-yarn-common</module>
     <module>hadoop-yarn-server</module>
+    <module>hadoop-yarn-applications</module>
   </modules>
 </project>