|
@@ -18,27 +18,25 @@
|
|
|
|
|
|
package org.apache.hadoop.mapred;
|
|
|
|
|
|
-import java.io.File;
|
|
|
import java.net.InetSocketAddress;
|
|
|
import java.util.Iterator;
|
|
|
import java.util.List;
|
|
|
import java.util.Map;
|
|
|
import java.util.Vector;
|
|
|
|
|
|
-import org.apache.commons.logging.Log;
|
|
|
-import org.apache.commons.logging.LogFactory;
|
|
|
+import org.apache.hadoop.fs.Path;
|
|
|
import org.apache.hadoop.mapred.TaskLog.LogName;
|
|
|
import org.apache.hadoop.mapreduce.ID;
|
|
|
-import org.apache.hadoop.util.StringUtils;
|
|
|
+import org.apache.hadoop.mapreduce.MRJobConfig;
|
|
|
+import org.apache.hadoop.mapreduce.v2.util.MRApps;
|
|
|
+import org.apache.hadoop.yarn.api.ApplicationConstants;
|
|
|
+import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
|
|
|
|
|
|
public class MapReduceChildJVM {
|
|
|
- private static final String SYSTEM_PATH_SEPARATOR =
|
|
|
- System.getProperty("path.separator");
|
|
|
|
|
|
- private static final Log LOG = LogFactory.getLog(MapReduceChildJVM.class);
|
|
|
-
|
|
|
- private static File getTaskLogFile(String logDir, LogName filter) {
|
|
|
- return new File(logDir, filter.toString());
|
|
|
+ private static String getTaskLogFile(LogName filter) {
|
|
|
+ return ApplicationConstants.LOG_DIR_EXPANSION_VAR + Path.SEPARATOR +
|
|
|
+ filter.toString();
|
|
|
}
|
|
|
|
|
|
private static String getChildEnv(JobConf jobConf, boolean isMap) {
|
|
@@ -50,32 +48,53 @@ public class MapReduceChildJVM {
|
|
|
jobConf.get(jobConf.MAPRED_TASK_ENV));
|
|
|
}
|
|
|
|
|
|
- public static void setVMEnv(Map<String, String> env,
|
|
|
- List<String> classPaths, String pwd, String containerLogDir,
|
|
|
- String nmLdLibraryPath, Task task, CharSequence applicationTokensFile) {
|
|
|
-
|
|
|
- JobConf conf = task.conf;
|
|
|
-
|
|
|
- // Add classpath.
|
|
|
- CharSequence cp = env.get("CLASSPATH");
|
|
|
- String classpath = StringUtils.join(SYSTEM_PATH_SEPARATOR, classPaths);
|
|
|
- if (null == cp) {
|
|
|
- env.put("CLASSPATH", classpath);
|
|
|
+ private static String getChildLogLevel(JobConf conf, boolean isMap) {
|
|
|
+ if (isMap) {
|
|
|
+ return conf.get(
|
|
|
+ MRJobConfig.MAP_LOG_LEVEL,
|
|
|
+ JobConf.DEFAULT_LOG_LEVEL.toString()
|
|
|
+ );
|
|
|
} else {
|
|
|
- env.put("CLASSPATH", classpath + SYSTEM_PATH_SEPARATOR + cp);
|
|
|
+ return conf.get(
|
|
|
+ MRJobConfig.REDUCE_LOG_LEVEL,
|
|
|
+ JobConf.DEFAULT_LOG_LEVEL.toString()
|
|
|
+ );
|
|
|
}
|
|
|
+ }
|
|
|
+
|
|
|
+ public static void setVMEnv(Map<String, String> environment,
|
|
|
+ Task task) {
|
|
|
|
|
|
- /////// Environmental variable LD_LIBRARY_PATH
|
|
|
- StringBuilder ldLibraryPath = new StringBuilder();
|
|
|
+ JobConf conf = task.conf;
|
|
|
|
|
|
- ldLibraryPath.append(nmLdLibraryPath);
|
|
|
- ldLibraryPath.append(SYSTEM_PATH_SEPARATOR);
|
|
|
- ldLibraryPath.append(pwd);
|
|
|
- env.put("LD_LIBRARY_PATH", ldLibraryPath.toString());
|
|
|
- /////// Environmental variable LD_LIBRARY_PATH
|
|
|
+ // Shell
|
|
|
+ environment.put(
|
|
|
+ Environment.SHELL.name(),
|
|
|
+ conf.get(
|
|
|
+ MRJobConfig.MAPRED_ADMIN_USER_SHELL,
|
|
|
+ MRJobConfig.DEFAULT_SHELL)
|
|
|
+ );
|
|
|
+
|
|
|
+ // Add pwd to LD_LIBRARY_PATH, add this before adding anything else
|
|
|
+ MRApps.addToEnvironment(
|
|
|
+ environment,
|
|
|
+ Environment.LD_LIBRARY_PATH.name(),
|
|
|
+ Environment.PWD.$());
|
|
|
+
|
|
|
+ // Add the env variables passed by the user & admin
|
|
|
+ String mapredChildEnv = getChildEnv(conf, task.isMapTask());
|
|
|
+ MRApps.setEnvFromInputString(environment, mapredChildEnv);
|
|
|
+ MRApps.setEnvFromInputString(
|
|
|
+ environment,
|
|
|
+ conf.get(
|
|
|
+ MRJobConfig.MAPRED_ADMIN_USER_ENV,
|
|
|
+ MRJobConfig.DEFAULT_MAPRED_ADMIN_USER_ENV)
|
|
|
+ );
|
|
|
|
|
|
- // for the child of task jvm, set hadoop.root.logger
|
|
|
- env.put("HADOOP_ROOT_LOGGER", "DEBUG,CLA"); // TODO: Debug
|
|
|
+ // Set logging level
|
|
|
+ environment.put(
|
|
|
+ "HADOOP_ROOT_LOGGER",
|
|
|
+ getChildLogLevel(conf, task.isMapTask()) + ",CLA");
|
|
|
|
|
|
// TODO: The following is useful for instance in streaming tasks. Should be
|
|
|
// set in ApplicationMaster's env by the RM.
|
|
@@ -89,76 +108,69 @@ public class MapReduceChildJVM {
|
|
|
// properties.
|
|
|
long logSize = TaskLog.getTaskLogLength(conf);
|
|
|
Vector<String> logProps = new Vector<String>(4);
|
|
|
- setupLog4jProperties(logProps, logSize, containerLogDir);
|
|
|
+ setupLog4jProperties(logProps, logSize);
|
|
|
Iterator<String> it = logProps.iterator();
|
|
|
StringBuffer buffer = new StringBuffer();
|
|
|
while (it.hasNext()) {
|
|
|
buffer.append(" " + it.next());
|
|
|
}
|
|
|
hadoopClientOpts = hadoopClientOpts + buffer.toString();
|
|
|
-
|
|
|
- env.put("HADOOP_CLIENT_OPTS", hadoopClientOpts);
|
|
|
+ environment.put("HADOOP_CLIENT_OPTS", hadoopClientOpts);
|
|
|
|
|
|
- // add the env variables passed by the user
|
|
|
- String mapredChildEnv = getChildEnv(conf, task.isMapTask());
|
|
|
- if (mapredChildEnv != null && mapredChildEnv.length() > 0) {
|
|
|
- String childEnvs[] = mapredChildEnv.split(",");
|
|
|
- for (String cEnv : childEnvs) {
|
|
|
- String[] parts = cEnv.split("="); // split on '='
|
|
|
- String value = (String) env.get(parts[0]);
|
|
|
- if (value != null) {
|
|
|
- // replace $env with the child's env constructed by tt's
|
|
|
- // example LD_LIBRARY_PATH=$LD_LIBRARY_PATH:/tmp
|
|
|
- value = parts[1].replace("$" + parts[0], value);
|
|
|
- } else {
|
|
|
- // this key is not configured by the tt for the child .. get it
|
|
|
- // from the tt's env
|
|
|
- // example PATH=$PATH:/tmp
|
|
|
- value = System.getenv(parts[0]); // Get from NM?
|
|
|
- if (value != null) {
|
|
|
- // the env key is present in the tt's env
|
|
|
- value = parts[1].replace("$" + parts[0], value);
|
|
|
- } else {
|
|
|
- // the env key is note present anywhere .. simply set it
|
|
|
- // example X=$X:/tmp or X=/tmp
|
|
|
- value = parts[1].replace("$" + parts[0], "");
|
|
|
- }
|
|
|
- }
|
|
|
- env.put(parts[0], value);
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- //This should not be set here (If an OS check is requied. moved to ContainerLuanch)
|
|
|
- // env.put("JVM_PID", "`echo $$`");
|
|
|
-
|
|
|
- env.put(Constants.STDOUT_LOGFILE_ENV,
|
|
|
- getTaskLogFile(containerLogDir, TaskLog.LogName.STDOUT).toString());
|
|
|
- env.put(Constants.STDERR_LOGFILE_ENV,
|
|
|
- getTaskLogFile(containerLogDir, TaskLog.LogName.STDERR).toString());
|
|
|
+ // Add stdout/stderr env
|
|
|
+ environment.put(
|
|
|
+ MRJobConfig.STDOUT_LOGFILE_ENV,
|
|
|
+ getTaskLogFile(TaskLog.LogName.STDOUT)
|
|
|
+ );
|
|
|
+ environment.put(
|
|
|
+ MRJobConfig.STDERR_LOGFILE_ENV,
|
|
|
+ getTaskLogFile(TaskLog.LogName.STDERR)
|
|
|
+ );
|
|
|
}
|
|
|
|
|
|
private static String getChildJavaOpts(JobConf jobConf, boolean isMapTask) {
|
|
|
+ String userClasspath = "";
|
|
|
+ String adminClasspath = "";
|
|
|
if (isMapTask) {
|
|
|
- return jobConf.get(JobConf.MAPRED_MAP_TASK_JAVA_OPTS, jobConf.get(
|
|
|
- JobConf.MAPRED_TASK_JAVA_OPTS,
|
|
|
- JobConf.DEFAULT_MAPRED_TASK_JAVA_OPTS));
|
|
|
+ userClasspath =
|
|
|
+ jobConf.get(
|
|
|
+ JobConf.MAPRED_MAP_TASK_JAVA_OPTS,
|
|
|
+ jobConf.get(
|
|
|
+ JobConf.MAPRED_TASK_JAVA_OPTS,
|
|
|
+ JobConf.DEFAULT_MAPRED_TASK_JAVA_OPTS)
|
|
|
+ );
|
|
|
+ adminClasspath =
|
|
|
+ jobConf.get(
|
|
|
+ MRJobConfig.MAPRED_MAP_ADMIN_JAVA_OPTS,
|
|
|
+ MRJobConfig.DEFAULT_MAPRED_ADMIN_JAVA_OPTS);
|
|
|
+ } else {
|
|
|
+ userClasspath =
|
|
|
+ jobConf.get(
|
|
|
+ JobConf.MAPRED_REDUCE_TASK_JAVA_OPTS,
|
|
|
+ jobConf.get(
|
|
|
+ JobConf.MAPRED_TASK_JAVA_OPTS,
|
|
|
+ JobConf.DEFAULT_MAPRED_TASK_JAVA_OPTS)
|
|
|
+ );
|
|
|
+ adminClasspath =
|
|
|
+ jobConf.get(
|
|
|
+ MRJobConfig.MAPRED_REDUCE_ADMIN_JAVA_OPTS,
|
|
|
+ MRJobConfig.DEFAULT_MAPRED_ADMIN_JAVA_OPTS);
|
|
|
}
|
|
|
- return jobConf
|
|
|
- .get(JobConf.MAPRED_REDUCE_TASK_JAVA_OPTS, jobConf.get(
|
|
|
- JobConf.MAPRED_TASK_JAVA_OPTS,
|
|
|
- JobConf.DEFAULT_MAPRED_TASK_JAVA_OPTS));
|
|
|
+
|
|
|
+ // Add admin classpath first so it can be overridden by user.
|
|
|
+ return adminClasspath + " " + userClasspath;
|
|
|
}
|
|
|
|
|
|
private static void setupLog4jProperties(Vector<String> vargs,
|
|
|
- long logSize, String containerLogDir) {
|
|
|
+ long logSize) {
|
|
|
vargs.add("-Dlog4j.configuration=container-log4j.properties");
|
|
|
- vargs.add("-Dhadoop.yarn.mr.containerLogDir=" + containerLogDir);
|
|
|
+ vargs.add("-Dhadoop.yarn.mr.containerLogDir=" + ApplicationConstants.LOG_DIR_EXPANSION_VAR);
|
|
|
vargs.add("-Dhadoop.yarn.mr.totalLogFileSize=" + logSize);
|
|
|
}
|
|
|
|
|
|
public static List<String> getVMCommand(
|
|
|
- InetSocketAddress taskAttemptListenerAddr, Task task, String javaHome,
|
|
|
- String workDir, String logDir, String childTmpDir, ID jvmID) {
|
|
|
+ InetSocketAddress taskAttemptListenerAddr, Task task,
|
|
|
+ ID jvmID) {
|
|
|
|
|
|
TaskAttemptID attemptID = task.getTaskID();
|
|
|
JobConf conf = task.conf;
|
|
@@ -166,7 +178,7 @@ public class MapReduceChildJVM {
|
|
|
Vector<String> vargs = new Vector<String>(8);
|
|
|
|
|
|
vargs.add("exec");
|
|
|
- vargs.add(javaHome + "/bin/java");
|
|
|
+ vargs.add(Environment.JAVA_HOME.$() + "/bin/java");
|
|
|
|
|
|
// Add child (task) java-vm options.
|
|
|
//
|
|
@@ -199,44 +211,26 @@ public class MapReduceChildJVM {
|
|
|
String javaOpts = getChildJavaOpts(conf, task.isMapTask());
|
|
|
javaOpts = javaOpts.replace("@taskid@", attemptID.toString());
|
|
|
String [] javaOptsSplit = javaOpts.split(" ");
|
|
|
-
|
|
|
- // Add java.library.path; necessary for loading native libraries.
|
|
|
- //
|
|
|
- // 1. We add the 'cwd' of the task to it's java.library.path to help
|
|
|
- // users distribute native libraries via the DistributedCache.
|
|
|
- // 2. The user can also specify extra paths to be added to the
|
|
|
- // java.library.path via mapred.{map|reduce}.child.java.opts.
|
|
|
- //
|
|
|
- String libraryPath = workDir;
|
|
|
- boolean hasUserLDPath = false;
|
|
|
- for(int i=0; i<javaOptsSplit.length ;i++) {
|
|
|
- if(javaOptsSplit[i].startsWith("-Djava.library.path=")) {
|
|
|
- // TODO: Does the above take care of escaped space chars
|
|
|
- javaOptsSplit[i] += SYSTEM_PATH_SEPARATOR + libraryPath;
|
|
|
- hasUserLDPath = true;
|
|
|
- break;
|
|
|
- }
|
|
|
- }
|
|
|
- if(!hasUserLDPath) {
|
|
|
- vargs.add("-Djava.library.path=" + libraryPath);
|
|
|
- }
|
|
|
for (int i = 0; i < javaOptsSplit.length; i++) {
|
|
|
vargs.add(javaOptsSplit[i]);
|
|
|
}
|
|
|
|
|
|
- if (childTmpDir != null) {
|
|
|
- vargs.add("-Djava.io.tmpdir=" + childTmpDir);
|
|
|
- }
|
|
|
+ String childTmpDir = Environment.PWD.$() + Path.SEPARATOR + "tmp";
|
|
|
+ vargs.add("-Djava.io.tmpdir=" + childTmpDir);
|
|
|
|
|
|
// Setup the log4j prop
|
|
|
long logSize = TaskLog.getTaskLogLength(conf);
|
|
|
- setupLog4jProperties(vargs, logSize, logDir);
|
|
|
+ setupLog4jProperties(vargs, logSize);
|
|
|
|
|
|
if (conf.getProfileEnabled()) {
|
|
|
if (conf.getProfileTaskRange(task.isMapTask()
|
|
|
).isIncluded(task.getPartition())) {
|
|
|
- File prof = getTaskLogFile(logDir, TaskLog.LogName.PROFILE);
|
|
|
- vargs.add(String.format(conf.getProfileParams(), prof.toString()));
|
|
|
+ vargs.add(
|
|
|
+ String.format(
|
|
|
+ conf.getProfileParams(),
|
|
|
+ getTaskLogFile(TaskLog.LogName.PROFILE)
|
|
|
+ )
|
|
|
+ );
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -249,8 +243,8 @@ public class MapReduceChildJVM {
|
|
|
|
|
|
// Finally add the jvmID
|
|
|
vargs.add(String.valueOf(jvmID.getId()));
|
|
|
- vargs.add("1>" + getTaskLogFile(logDir, TaskLog.LogName.STDERR));
|
|
|
- vargs.add("2>" + getTaskLogFile(logDir, TaskLog.LogName.STDOUT));
|
|
|
+ vargs.add("1>" + getTaskLogFile(TaskLog.LogName.STDERR));
|
|
|
+ vargs.add("2>" + getTaskLogFile(TaskLog.LogName.STDOUT));
|
|
|
|
|
|
// Final commmand
|
|
|
StringBuilder mergedCommand = new StringBuilder();
|