|
@@ -53,6 +53,29 @@ import org.apache.hadoop.mapreduce.JobContext;
|
|
|
* user supplied map and reduce functions.
|
|
|
*/
|
|
|
abstract class TaskRunner extends Thread {
|
|
|
+
|
|
|
+ static final String MAPRED_MAP_ADMIN_JAVA_OPTS =
|
|
|
+ "mapreduce.admin.map.child.java.opts";
|
|
|
+
|
|
|
+ static final String MAPRED_REDUCE_ADMIN_JAVA_OPTS =
|
|
|
+ "mapreduce.admin.reduce.child.java.opts";
|
|
|
+
|
|
|
+ static final String DEFAULT_MAPRED_ADMIN_JAVA_OPTS =
|
|
|
+ "-Djava.net.preferIPv4Stack=true";
|
|
|
+
|
|
|
+ static final String MAPRED_ADMIN_USER_SHELL =
|
|
|
+ "mapreduce.admin.user.shell";
|
|
|
+
|
|
|
+ static final String DEFAULT_SHELL = "/bin/bash";
|
|
|
+
|
|
|
+ static final String MAPRED_ADMIN_USER_HOME_DIR =
|
|
|
+ "mapreduce.admin.user.home.dir";
|
|
|
+
|
|
|
+ static final String DEFAULT_HOME_DIR= "/homes/";
|
|
|
+
|
|
|
+ static final String MAPRED_ADMIN_USER_ENV =
|
|
|
+ "mapreduce.admin.user.env";
|
|
|
+
|
|
|
public static final Log LOG =
|
|
|
LogFactory.getLog(TaskRunner.class);
|
|
|
|
|
@@ -162,6 +185,7 @@ abstract class TaskRunner extends Thread {
|
|
|
|
|
|
// We don't create any symlinks yet, so presence/absence of workDir
|
|
|
// actually on the file system doesn't matter.
|
|
|
+ String user = tip.getUGI().getUserName();
|
|
|
tip.getUGI().doAs(new PrivilegedExceptionAction<Void>() {
|
|
|
public Void run() throws IOException {
|
|
|
taskDistributedCacheManager =
|
|
@@ -204,7 +228,7 @@ abstract class TaskRunner extends Thread {
|
|
|
stderr);
|
|
|
|
|
|
Map<String, String> env = new HashMap<String, String>();
|
|
|
- errorInfo = getVMEnvironment(errorInfo, workDir, conf, env, taskid,
|
|
|
+ errorInfo = getVMEnvironment(errorInfo, user, workDir, conf, env, taskid,
|
|
|
logSize);
|
|
|
|
|
|
launchJvmAndWait(setup, vargs, stdout, stderr, logSize, workDir, env);
|
|
@@ -510,14 +534,7 @@ abstract class TaskRunner extends Thread {
|
|
|
return classPaths;
|
|
|
}
|
|
|
|
|
|
- /**
|
|
|
- * @param errorInfo
|
|
|
- * @param workDir
|
|
|
- * @param env
|
|
|
- * @return
|
|
|
- * @throws Throwable
|
|
|
- */
|
|
|
- private String getVMEnvironment(String errorInfo, File workDir, JobConf conf,
|
|
|
+ private String getVMEnvironment(String errorInfo, String user, File workDir, JobConf conf,
|
|
|
Map<String, String> env, TaskAttemptID taskid, long logSize)
|
|
|
throws Throwable {
|
|
|
StringBuffer ldLibraryPath = new StringBuffer();
|
|
@@ -529,11 +546,11 @@ abstract class TaskRunner extends Thread {
|
|
|
ldLibraryPath.append(oldLdLibraryPath);
|
|
|
}
|
|
|
env.put("LD_LIBRARY_PATH", ldLibraryPath.toString());
|
|
|
-
|
|
|
+ //update user configured login-shell properties
|
|
|
+ updateUserLoginEnv(errorInfo, user, conf, env);
|
|
|
String jobTokenFile = conf.get(TokenCache.JOB_TOKENS_FILENAME);
|
|
|
LOG.debug("putting jobToken file name into environment fn=" + jobTokenFile);
|
|
|
- env.put(UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION, jobTokenFile);
|
|
|
-
|
|
|
+ env.put(UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION, jobTokenFile);
|
|
|
// for the child of task jvm, set hadoop.root.logger
|
|
|
env.put("HADOOP_ROOT_LOGGER","INFO,TLA");
|
|
|
String hadoopClientOpts = System.getenv("HADOOP_CLIENT_OPTS");
|
|
@@ -549,6 +566,30 @@ abstract class TaskRunner extends Thread {
|
|
|
|
|
|
// add the env variables passed by the user
|
|
|
String mapredChildEnv = getChildEnv(conf);
|
|
|
+ return setEnvFromInputString(errorInfo, env, mapredChildEnv);
|
|
|
+ }
|
|
|
+
|
|
|
+ void updateUserLoginEnv(String errorInfo, String user, JobConf config,
|
|
|
+ Map<String, String> env)
|
|
|
+ throws Throwable {
|
|
|
+ env.put("USER",user);
|
|
|
+ env.put("SHELL", config.get(MAPRED_ADMIN_USER_SHELL, DEFAULT_SHELL));
|
|
|
+ env.put("LOGNAME", user);
|
|
|
+ env.put("HOME", config.get(MAPRED_ADMIN_USER_HOME_DIR, DEFAULT_HOME_DIR));
|
|
|
+ // additional user configured login properties
|
|
|
+ String customEnv = config.get(MAPRED_ADMIN_USER_ENV);
|
|
|
+ setEnvFromInputString(errorInfo, env, customEnv);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * @param errorInfo
|
|
|
+ * @param env
|
|
|
+ * @param mapredChildEnv
|
|
|
+ * @return
|
|
|
+ * @throws Throwable
|
|
|
+ */
|
|
|
+ private String setEnvFromInputString(String errorInfo, Map<String, String> env,
|
|
|
+ String mapredChildEnv) throws Throwable {
|
|
|
if (mapredChildEnv != null && mapredChildEnv.length() > 0) {
|
|
|
String childEnvs[] = mapredChildEnv.split(",");
|
|
|
for (String cEnv : childEnvs) {
|
|
@@ -560,7 +601,7 @@ abstract class TaskRunner extends Thread {
|
|
|
// example LD_LIBRARY_PATH=$LD_LIBRARY_PATH:/tmp
|
|
|
value = parts[1].replace("$" + parts[0], value);
|
|
|
} else {
|
|
|
- // this key is not configured by the tt for the child .. get it
|
|
|
+ // this key is not configured by the tt for the child .. get it
|
|
|
// from the tt's env
|
|
|
// example PATH=$PATH:/tmp
|
|
|
value = System.getenv(parts[0]);
|
|
@@ -576,9 +617,9 @@ abstract class TaskRunner extends Thread {
|
|
|
env.put(parts[0], value);
|
|
|
} catch (Throwable t) {
|
|
|
// set the error msg
|
|
|
- errorInfo = "Invalid User environment settings : " + mapredChildEnv
|
|
|
- + ". Failed to parse user-passed environment param."
|
|
|
- + " Expecting : env1=value1,env2=value2...";
|
|
|
+ errorInfo = "Invalid User environment settings : " + mapredChildEnv
|
|
|
+ + ". Failed to parse user-passed environment param."
|
|
|
+ + " Expecting : env1=value1,env2=value2...";
|
|
|
LOG.warn(errorInfo);
|
|
|
throw t;
|
|
|
}
|