|
@@ -32,7 +32,6 @@ import org.apache.hadoop.fs.permission.FsPermission;
|
|
import org.apache.hadoop.mapred.TaskController;
|
|
import org.apache.hadoop.mapred.TaskController;
|
|
import org.apache.hadoop.mapred.TaskLog;
|
|
import org.apache.hadoop.mapred.TaskLog;
|
|
import org.apache.hadoop.mapred.TaskTracker;
|
|
import org.apache.hadoop.mapred.TaskTracker;
|
|
-import org.apache.hadoop.mapreduce.JobID;
|
|
|
|
|
|
|
|
/**
|
|
/**
|
|
*
|
|
*
|
|
@@ -112,15 +111,17 @@ public class Localizer {
|
|
if (fs.exists(userDir) || fs.mkdirs(userDir)) {
|
|
if (fs.exists(userDir) || fs.mkdirs(userDir)) {
|
|
|
|
|
|
// Set permissions on the user-directory
|
|
// Set permissions on the user-directory
|
|
- fs.setPermission(userDir, new FsPermission((short)0700));
|
|
|
|
|
|
+ FsPermission userOnly = new FsPermission((short) 0700);
|
|
|
|
+ FileUtil.setPermission(new File(userDir.toUri().getPath()),
|
|
|
|
+ userOnly);
|
|
userDirStatus = true;
|
|
userDirStatus = true;
|
|
|
|
|
|
// Set up the jobcache directory
|
|
// Set up the jobcache directory
|
|
- Path jobCacheDir =
|
|
|
|
- new Path(localDir, TaskTracker.getJobCacheSubdir(user));
|
|
|
|
- if (fs.exists(jobCacheDir) || fs.mkdirs(jobCacheDir)) {
|
|
|
|
|
|
+ File jobCacheDir =
|
|
|
|
+ new File(localDir, TaskTracker.getJobCacheSubdir(user));
|
|
|
|
+ if (jobCacheDir.exists() || jobCacheDir.mkdirs()) {
|
|
// Set permissions on the jobcache-directory
|
|
// Set permissions on the jobcache-directory
|
|
- fs.setPermission(jobCacheDir, new FsPermission((short)0700));
|
|
|
|
|
|
+ FileUtil.setPermission(jobCacheDir, userOnly);
|
|
jobCacheDirStatus = true;
|
|
jobCacheDirStatus = true;
|
|
} else {
|
|
} else {
|
|
LOG.warn("Unable to create job cache directory : "
|
|
LOG.warn("Unable to create job cache directory : "
|
|
@@ -128,11 +129,12 @@ public class Localizer {
|
|
}
|
|
}
|
|
|
|
|
|
// Set up the cache directory used for distributed cache files
|
|
// Set up the cache directory used for distributed cache files
|
|
- Path distributedCacheDir =
|
|
|
|
- new Path(localDir, TaskTracker.getPrivateDistributedCacheDir(user));
|
|
|
|
- if (fs.exists(distributedCacheDir) || fs.mkdirs(distributedCacheDir)) {
|
|
|
|
|
|
+ File distributedCacheDir =
|
|
|
|
+ new File(localDir,
|
|
|
|
+ TaskTracker.getPrivateDistributedCacheDir(user));
|
|
|
|
+ if (distributedCacheDir.exists() || distributedCacheDir.mkdirs()) {
|
|
// Set permissions on the distcache-directory
|
|
// Set permissions on the distcache-directory
|
|
- fs.setPermission(distributedCacheDir, new FsPermission((short)0700));
|
|
|
|
|
|
+ FileUtil.setPermission(distributedCacheDir, userOnly);
|
|
distributedCacheDirStatus = true;
|
|
distributedCacheDirStatus = true;
|
|
} else {
|
|
} else {
|
|
LOG.warn("Unable to create distributed-cache directory : "
|
|
LOG.warn("Unable to create distributed-cache directory : "
|
|
@@ -163,51 +165,6 @@ public class Localizer {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
- /**
|
|
|
|
- * Prepare the job directories for a given job. To be called by the job
|
|
|
|
- * localization code, only if the job is not already localized.
|
|
|
|
- *
|
|
|
|
- * <br>
|
|
|
|
- * Here, we set 700 permissions on the job directories created on all disks.
|
|
|
|
- * This we do so as to avoid any misuse by other users till the time
|
|
|
|
- * {@link TaskController#initializeJob} is run at a
|
|
|
|
- * later time to set proper private permissions on the job directories. <br>
|
|
|
|
- *
|
|
|
|
- * @param user
|
|
|
|
- * @param jobId
|
|
|
|
- * @throws IOException
|
|
|
|
- */
|
|
|
|
- public void initializeJobDirs(String user, JobID jobId)
|
|
|
|
- throws IOException {
|
|
|
|
- boolean initJobDirStatus = false;
|
|
|
|
- String jobDirPath = TaskTracker.getLocalJobDir(user, jobId.toString());
|
|
|
|
- for (String localDir : localDirs) {
|
|
|
|
- Path jobDir = new Path(localDir, jobDirPath);
|
|
|
|
- if (fs.exists(jobDir)) {
|
|
|
|
- // this will happen on a partial execution of localizeJob. Sometimes
|
|
|
|
- // copying job.xml to the local disk succeeds but copying job.jar might
|
|
|
|
- // throw out an exception. We should clean up and then try again.
|
|
|
|
- fs.delete(jobDir, true);
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- boolean jobDirStatus = fs.mkdirs(jobDir);
|
|
|
|
- if (!jobDirStatus) {
|
|
|
|
- LOG.warn("Not able to create job directory " + jobDir.toString());
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- initJobDirStatus = initJobDirStatus || jobDirStatus;
|
|
|
|
-
|
|
|
|
- // job-dir has to be private to the TT
|
|
|
|
- fs.setPermission(jobDir, new FsPermission((short)0700));
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- if (!initJobDirStatus) {
|
|
|
|
- throw new IOException("Not able to initialize job directories "
|
|
|
|
- + "in any of the configured local directories for job "
|
|
|
|
- + jobId.toString());
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
/**
|
|
/**
|
|
* Create taskDirs on all the disks. Otherwise, in some cases, like when
|
|
* Create taskDirs on all the disks. Otherwise, in some cases, like when
|
|
* LinuxTaskController is in use, child might wish to balance load across
|
|
* LinuxTaskController is in use, child might wish to balance load across
|
|
@@ -244,18 +201,4 @@ public class Localizer {
|
|
+ attemptId);
|
|
+ attemptId);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
-
|
|
|
|
- /**
|
|
|
|
- * Create job log directory and set appropriate permissions for the directory.
|
|
|
|
- *
|
|
|
|
- * @param jobId
|
|
|
|
- */
|
|
|
|
- public void initializeJobLogDir(JobID jobId) throws IOException {
|
|
|
|
- Path jobUserLogDir = new Path(TaskLog.getJobDir(jobId).getCanonicalPath());
|
|
|
|
- if (!fs.mkdirs(jobUserLogDir)) {
|
|
|
|
- throw new IOException("Could not create job user log directory: " +
|
|
|
|
- jobUserLogDir);
|
|
|
|
- }
|
|
|
|
- fs.setPermission(jobUserLogDir, new FsPermission((short)0700));
|
|
|
|
- }
|
|
|
|
}
|
|
}
|