|
@@ -30,9 +30,11 @@ import java.util.Vector;
|
|
|
import org.apache.commons.cli.CommandLine;
|
|
|
import org.apache.commons.cli.GnuParser;
|
|
|
import org.apache.commons.cli.HelpFormatter;
|
|
|
+import org.apache.commons.cli.Option;
|
|
|
import org.apache.commons.cli.Options;
|
|
|
import org.apache.commons.cli.ParseException;
|
|
|
import org.apache.commons.io.IOUtils;
|
|
|
+import org.apache.commons.lang.StringUtils;
|
|
|
import org.apache.commons.logging.Log;
|
|
|
import org.apache.commons.logging.LogFactory;
|
|
|
import org.apache.hadoop.classification.InterfaceAudience;
|
|
@@ -137,7 +139,7 @@ public class Client {
|
|
|
// Location of shell script
|
|
|
private String shellScriptPath = "";
|
|
|
// Args to be passed to the shell command
|
|
|
- private String shellArgs = "";
|
|
|
+ private String[] shellArgs = new String[] {};
|
|
|
// Env variables to be setup for the shell command
|
|
|
private Map<String, String> shellEnv = new HashMap<String, String>();
|
|
|
// Shell Command Container priority
|
|
@@ -166,6 +168,8 @@ public class Client {
|
|
|
private Options opts;
|
|
|
|
|
|
private final String shellCommandPath = "shellCommands";
|
|
|
+ private final String shellArgsPath = "shellArgs";
|
|
|
+ private final String appMasterJarPath = "AppMaster.jar";
|
|
|
// Hardcoded path to custom log_properties
|
|
|
private final String log4jPath = "log4j.properties";
|
|
|
|
|
@@ -223,7 +227,9 @@ public class Client {
|
|
|
opts.addOption("jar", true, "Jar file containing the application master");
|
|
|
opts.addOption("shell_command", true, "Shell command to be executed by the Application Master");
|
|
|
opts.addOption("shell_script", true, "Location of the shell script to be executed");
|
|
|
- opts.addOption("shell_args", true, "Command line args for the shell script");
|
|
|
+ opts.addOption("shell_args", true, "Command line args for the shell script." +
|
|
|
+ "Multiple args can be separated by empty space.");
|
|
|
+ opts.getOption("shell_args").setArgs(Option.UNLIMITED_VALUES);
|
|
|
opts.addOption("shell_env", true, "Environment for shell script. Specified as env_key=env_val pairs");
|
|
|
opts.addOption("shell_cmd_priority", true, "Priority for the shell command containers");
|
|
|
opts.addOption("container_memory", true, "Amount of memory in MB to be requested to run the shell command");
|
|
@@ -311,7 +317,7 @@ public class Client {
|
|
|
shellScriptPath = cliParser.getOptionValue("shell_script");
|
|
|
}
|
|
|
if (cliParser.hasOption("shell_args")) {
|
|
|
- shellArgs = cliParser.getOptionValue("shell_args");
|
|
|
+ shellArgs = cliParser.getOptionValues("shell_args");
|
|
|
}
|
|
|
if (cliParser.hasOption("shell_env")) {
|
|
|
String envs[] = cliParser.getOptionValues("shell_env");
|
|
@@ -440,43 +446,13 @@ public class Client {
|
|
|
// Copy the application master jar to the filesystem
|
|
|
// Create a local resource to point to the destination jar path
|
|
|
FileSystem fs = FileSystem.get(conf);
|
|
|
- Path src = new Path(appMasterJar);
|
|
|
- String pathSuffix = appName + "/" + appId.getId() + "/AppMaster.jar";
|
|
|
- Path dst = new Path(fs.getHomeDirectory(), pathSuffix);
|
|
|
- fs.copyFromLocalFile(false, true, src, dst);
|
|
|
- FileStatus destStatus = fs.getFileStatus(dst);
|
|
|
- LocalResource amJarRsrc = Records.newRecord(LocalResource.class);
|
|
|
-
|
|
|
- // Set the type of resource - file or archive
|
|
|
- // archives are untarred at destination
|
|
|
- // we don't need the jar file to be untarred for now
|
|
|
- amJarRsrc.setType(LocalResourceType.FILE);
|
|
|
- // Set visibility of the resource
|
|
|
- // Setting to most private option
|
|
|
- amJarRsrc.setVisibility(LocalResourceVisibility.APPLICATION);
|
|
|
- // Set the resource to be copied over
|
|
|
- amJarRsrc.setResource(ConverterUtils.getYarnUrlFromPath(dst));
|
|
|
- // Set timestamp and length of file so that the framework
|
|
|
- // can do basic sanity checks for the local resource
|
|
|
- // after it has been copied over to ensure it is the same
|
|
|
- // resource the client intended to use with the application
|
|
|
- amJarRsrc.setTimestamp(destStatus.getModificationTime());
|
|
|
- amJarRsrc.setSize(destStatus.getLen());
|
|
|
- localResources.put("AppMaster.jar", amJarRsrc);
|
|
|
+ addToLocalResources(fs, appMasterJar, appMasterJarPath, appId.getId(),
|
|
|
+ localResources, null);
|
|
|
|
|
|
// Set the log4j properties if needed
|
|
|
if (!log4jPropFile.isEmpty()) {
|
|
|
- Path log4jSrc = new Path(log4jPropFile);
|
|
|
- String log4jPathSuffix = appName + "/" + appId.getId() + "/" + log4jPath;
|
|
|
- Path log4jDst = new Path(fs.getHomeDirectory(), log4jPathSuffix);
|
|
|
- fs.copyFromLocalFile(false, true, log4jSrc, log4jDst);
|
|
|
- FileStatus log4jFileStatus = fs.getFileStatus(log4jDst);
|
|
|
- LocalResource log4jRsrc =
|
|
|
- LocalResource.newInstance(
|
|
|
- ConverterUtils.getYarnUrlFromURI(log4jDst.toUri()),
|
|
|
- LocalResourceType.FILE, LocalResourceVisibility.APPLICATION,
|
|
|
- log4jFileStatus.getLen(), log4jFileStatus.getModificationTime());
|
|
|
- localResources.put(log4jPath, log4jRsrc);
|
|
|
+ addToLocalResources(fs, log4jPropFile, log4jPath, appId.getId(),
|
|
|
+ localResources, null);
|
|
|
}
|
|
|
|
|
|
// The shell script has to be made available on the final container(s)
|
|
@@ -500,25 +476,13 @@ public class Client {
|
|
|
}
|
|
|
|
|
|
if (!shellCommand.isEmpty()) {
|
|
|
- String shellCommandSuffix =
|
|
|
- appName + "/" + appId.getId() + "/" + shellCommandPath;
|
|
|
- Path shellCommandDst =
|
|
|
- new Path(fs.getHomeDirectory(), shellCommandSuffix);
|
|
|
- FSDataOutputStream ostream = null;
|
|
|
- try {
|
|
|
- ostream = FileSystem
|
|
|
- .create(fs, shellCommandDst, new FsPermission((short) 0710));
|
|
|
- ostream.writeUTF(shellCommand);
|
|
|
- } finally {
|
|
|
- IOUtils.closeQuietly(ostream);
|
|
|
- }
|
|
|
- FileStatus scFileStatus = fs.getFileStatus(shellCommandDst);
|
|
|
- LocalResource scRsrc =
|
|
|
- LocalResource.newInstance(
|
|
|
- ConverterUtils.getYarnUrlFromURI(shellCommandDst.toUri()),
|
|
|
- LocalResourceType.FILE, LocalResourceVisibility.APPLICATION,
|
|
|
- scFileStatus.getLen(), scFileStatus.getModificationTime());
|
|
|
- localResources.put(shellCommandPath, scRsrc);
|
|
|
+ addToLocalResources(fs, null, shellCommandPath, appId.getId(),
|
|
|
+ localResources, shellCommand);
|
|
|
+ }
|
|
|
+
|
|
|
+ if (shellArgs.length > 0) {
|
|
|
+ addToLocalResources(fs, null, shellArgsPath, appId.getId(),
|
|
|
+ localResources, StringUtils.join(shellArgs, " "));
|
|
|
}
|
|
|
// Set local resource info into app master container launch context
|
|
|
amContainer.setLocalResources(localResources);
|
|
@@ -579,9 +543,6 @@ public class Client {
|
|
|
vargs.add("--num_containers " + String.valueOf(numContainers));
|
|
|
vargs.add("--priority " + String.valueOf(shellCmdPriority));
|
|
|
|
|
|
- if (!shellArgs.isEmpty()) {
|
|
|
- vargs.add("--shell_args " + shellArgs + "");
|
|
|
- }
|
|
|
for (Map.Entry<String, String> entry : shellEnv.entrySet()) {
|
|
|
vargs.add("--shell_env " + entry.getKey() + "=" + entry.getValue());
|
|
|
}
|
|
@@ -750,4 +711,31 @@ public class Client {
|
|
|
yarnClient.killApplication(appId);
|
|
|
}
|
|
|
|
|
|
+ private void addToLocalResources(FileSystem fs, String fileSrcPath,
|
|
|
+ String fileDstPath, int appId, Map<String, LocalResource> localResources,
|
|
|
+ String resources) throws IOException {
|
|
|
+ String suffix =
|
|
|
+ appName + "/" + appId + "/" + fileDstPath;
|
|
|
+ Path dst =
|
|
|
+ new Path(fs.getHomeDirectory(), suffix);
|
|
|
+ if (fileSrcPath == null) {
|
|
|
+ FSDataOutputStream ostream = null;
|
|
|
+ try {
|
|
|
+ ostream = FileSystem
|
|
|
+ .create(fs, dst, new FsPermission((short) 0710));
|
|
|
+ ostream.writeUTF(resources);
|
|
|
+ } finally {
|
|
|
+ IOUtils.closeQuietly(ostream);
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ fs.copyFromLocalFile(new Path(fileSrcPath), dst);
|
|
|
+ }
|
|
|
+ FileStatus scFileStatus = fs.getFileStatus(dst);
|
|
|
+ LocalResource scRsrc =
|
|
|
+ LocalResource.newInstance(
|
|
|
+ ConverterUtils.getYarnUrlFromURI(dst.toUri()),
|
|
|
+ LocalResourceType.FILE, LocalResourceVisibility.APPLICATION,
|
|
|
+ scFileStatus.getLen(), scFileStatus.getModificationTime());
|
|
|
+ localResources.put(fileDstPath, scRsrc);
|
|
|
+ }
|
|
|
}
|