|
@@ -227,7 +227,7 @@ public class LocalJobRunner implements ClientProtocol {
|
|
|
info.getSplitIndex(), 1);
|
|
|
map.setUser(UserGroupInformation.getCurrentUser().
|
|
|
getShortUserName());
|
|
|
- setupChildMapredLocalDirs(localJobDir, map, localConf);
|
|
|
+ setupChildMapredLocalDirs(map, localConf);
|
|
|
|
|
|
MapOutputFile mapOutput = new MROutputFiles();
|
|
|
mapOutput.setConf(localConf);
|
|
@@ -305,7 +305,7 @@ public class LocalJobRunner implements ClientProtocol {
|
|
|
reduceId, taskId, mapIds.size(), 1);
|
|
|
reduce.setUser(UserGroupInformation.getCurrentUser().
|
|
|
getShortUserName());
|
|
|
- setupChildMapredLocalDirs(localJobDir, reduce, localConf);
|
|
|
+ setupChildMapredLocalDirs(reduce, localConf);
|
|
|
reduce.setLocalMapFiles(mapOutputFiles);
|
|
|
|
|
|
if (!Job.this.isInterrupted()) {
|
|
@@ -958,16 +958,18 @@ public class LocalJobRunner implements ClientProtocol {
|
|
|
throw new UnsupportedOperationException("Not supported");
|
|
|
}
|
|
|
|
|
|
- static void setupChildMapredLocalDirs(Path localJobDir, Task t, JobConf conf) {
|
|
|
+ static void setupChildMapredLocalDirs(Task t, JobConf conf) {
|
|
|
String[] localDirs = conf.getTrimmedStrings(MRConfig.LOCAL_DIR);
|
|
|
+ String jobId = t.getJobID().toString();
|
|
|
String taskId = t.getTaskID().toString();
|
|
|
boolean isCleanup = t.isTaskCleanupTask();
|
|
|
+ String user = t.getUser();
|
|
|
StringBuffer childMapredLocalDir =
|
|
|
new StringBuffer(localDirs[0] + Path.SEPARATOR
|
|
|
- + getLocalTaskDir(localJobDir, taskId, isCleanup));
|
|
|
+ + getLocalTaskDir(user, jobId, taskId, isCleanup));
|
|
|
for (int i = 1; i < localDirs.length; i++) {
|
|
|
childMapredLocalDir.append("," + localDirs[i] + Path.SEPARATOR
|
|
|
- + getLocalTaskDir(localJobDir, taskId, isCleanup));
|
|
|
+ + getLocalTaskDir(user, jobId, taskId, isCleanup));
|
|
|
}
|
|
|
LOG.debug(MRConfig.LOCAL_DIR + " for child : " + childMapredLocalDir);
|
|
|
conf.set(MRConfig.LOCAL_DIR, childMapredLocalDir.toString());
|
|
@@ -976,9 +978,10 @@ public class LocalJobRunner implements ClientProtocol {
|
|
|
static final String TASK_CLEANUP_SUFFIX = ".cleanup";
|
|
|
static final String JOBCACHE = "jobcache";
|
|
|
|
|
|
- static String getLocalTaskDir(Path localJobDir, String taskid,
|
|
|
+ static String getLocalTaskDir(String user, String jobid, String taskid,
|
|
|
boolean isCleanupAttempt) {
|
|
|
- String taskDir = localJobDir.toString() + Path.SEPARATOR + taskid;
|
|
|
+ String taskDir = jobDir + Path.SEPARATOR + user + Path.SEPARATOR + JOBCACHE
|
|
|
+ + Path.SEPARATOR + jobid + Path.SEPARATOR + taskid;
|
|
|
if (isCleanupAttempt) {
|
|
|
taskDir = taskDir + TASK_CLEANUP_SUFFIX;
|
|
|
}
|